Git Product home page Git Product logo

Comments (3)

colinmjj avatar colinmjj commented on April 28, 2024

@wangyi2021 Currently, multiple replicas with LocalFile is not ready. We already did some refactor with shuffle read and multiple replicas should be supported next according to these refactor.

from firestorm.

wangyi2021 avatar wangyi2021 commented on April 28, 2024

@wangyi2021 Currently, multiple replicas with LocalFile is not ready. We already did some refactor with shuffle read and multiple replicas should be supported next according to these refactor.

case 4 tested by following configuration:
spark.rss.data.replica = 2
spark.rss.storage.type = LOCALFILE
result: application no re-run, application exec time is same as no rss abnormal.

task 195 error log (shuffle fetch for partiton 94 data):

2021-12-27 11:35:09,568 | INFO  | [dispatcher-Executor] | Got assigned task 195 | org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
2021-12-27 11:35:09,568 | INFO  | [Executor task launch worker for task 195] | Running task 94.0 in stage 1.0 (TID 195) | org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
2021-12-27 11:35:09,581 | INFO  | [Executor task launch worker for task 195] | Get taskId cost 1 ms, and request expected blockIds from 100 tasks for shuffleId[0], partitionId[94] | org.apache.spark.shuffle.RssShuffleManager.getReader(RssShuffleManager.java:290)
2021-12-27 11:35:09,581 | WARN  | [Executor task launch worker for task 195] | Get shuffle result is failed from ShuffleServerInfo{id[xx.xx.xx.rss-abnormal], host[xx.xx.xx.rss-abnormal], port[19999]} for appId[application_id_20211227], shuffleId[0] | com.tencent.rss.client.impl.ShuffleWriteClientImpl.getShuffleResult(ShuffleWriteClientImpl.java:313)
2021-12-27 11:35:09,583 | INFO  | [Executor task launch worker for task 195] | Get shuffle blockId cost 2 ms, and get 150 blockIds for shuffleId[0], partitionId[94] | org.apache.spark.shuffle.RssShuffleManager.getReaderImpl(RssShuffleManager.java:353)
2021-12-27 11:35:09,615 | INFO  | [Executor task launch worker for task 195] | Shuffle read started:appId=application_id_20211227, shuffleId=0,taskId=195_0, partitions: [94, 95), maps: [0, 2147483647) | org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:116)
2021-12-27 11:35:09,616 | INFO  | [Executor task launch worker for task 195] | Inserting aggregated records to sorter | org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:137)
2021-12-27 11:35:09,616 | WARN  | [Executor task launch worker for task 195] | Failed to read shuffle data with ShuffleServerGrpcClient for host[xx.xx.xx.rss-abnormal], port[19999] | com.tencent.rss.storage.handler.impl.LocalFileClientReadHandler.readShuffleData(LocalFileClientReadHandler.java:69)
io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
	at com.tencent.rss.proto.ShuffleServerGrpc$ShuffleServerBlockingStub.getShuffleData(ShuffleServerGrpc.java:607)
	at com.tencent.rss.client.impl.grpc.ShuffleServerGrpcClient.getShuffleData(ShuffleServerGrpcClient.java:422)
	at com.tencent.rss.storage.handler.impl.LocalFileClientReadHandler.readShuffleData(LocalFileClientReadHandler.java:64)
	at com.tencent.rss.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:180)
	at com.tencent.rss.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:119)
	at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:97)
	at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:211)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:200)
	at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:139)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:352)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:316)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$6(Executor.scala:575)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1422)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:578)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AnnotatedConnectException: finishConnect(..) failed: Connection refused: /xx.xx.xx.rss-abnormal:19999
Caused by: java.net.ConnectException: finishConnect(..) failed: Connection refused
	at io.grpc.netty.shaded.io.netty.channel.unix.Errors.throwConnectException(Errors.java:124)
	at io.grpc.netty.shaded.io.netty.channel.unix.Socket.finishConnect(Socket.java:243)
	at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:672)
	at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:649)
	at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529)
	at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465)
	at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

It will retry 3 times, then task succeed.

2021-12-27 11:35:10,075 | INFO  | [Executor task launch worker for task 195] | GetShuffleData for appId[application_id_20211227], shuffleId[0], partitionId[94] cost 1 ms | com.tencent.rss.client.impl.grpc.ShuffleServerGrpcClient.getShuffleData(ShuffleServerGrpcClient.java:425)
2021-12-27 11:35:10,075 | INFO  | [Executor task launch worker for task 195] | Metrics for shuffleId[0], partitionId[94], read data cost 129 ms, copy data cost 4 ms, crc check cost 2 ms | com.tencent.rss.client.impl.ShuffleReadClientImpl.logStatics(ShuffleReadClientImpl.java:218)
2021-12-27 11:35:10,076 | INFO  | [Executor task launch worker for task 195] | Fetch 0 bytes cost 137 ms and 3 ms to serialize, 87 ms to decompress with unCompressionLength[105941276] | org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:126)
2021-12-27 11:35:10,076 | INFO  | [Executor task launch worker for task 195] | Inserted aggregated records to sorter: millis:460 | org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:140)
2021-12-27 11:35:19,555 | INFO  | [Executor task launch worker for task 195] | Finished task 94.0 in stage 1.0 (TID 195). 1364 bytes result sent to driver | org.apache.spark.internal.Logging.logInfo(Logging.scala:57)

In rss-abnormal server log, there is no log about this reduce task for partition 94.
In backup rss server, log about partition 94:

[INFO] 2021-12-27 11:35:10,027 Grpc-454 ShuffleServerGrpcService getShuffleData - Successfully getShuffleData cost 4 ms for appId[application_id_20211227], shuffleId[0], partitionId[94] with 4682982 bytes and 19 blocks
[INFO] 2021-12-27 11:35:10,075 Grpc-462 ShuffleServerGrpcService getShuffleData - Successfully getShuffleData cost 0 ms for appId[application_id_20211227], shuffleId[0], partitionId[94] with 0 bytes and 0 blocks

from firestorm.

colinmjj avatar colinmjj commented on April 28, 2024

@wangyi2021 For shuffle write phase, we did implementation to mark task as successful if write data to any shuffle server successfully, but it is reverted because a lot of situations should be considered to make sure data is corrected. We have some ideas on better replica support already, and I hope it can be available next month.

from firestorm.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.