Git Product home page Git Product logo

Comments (5)

okdistribute avatar okdistribute commented on August 23, 2024 1

@serapath it's multifeed that's doing that, not cabal, just to be clear :)

from datdot-service.

serapath avatar serapath commented on August 23, 2024

kappa-core / multifeed / multiplex

multifeed - mux

MULTIPLEXER

function Multiplexer (isInitiator, key, opts) { // `key` - protocol encryption key

  self._id = opts._id || Math.floor(Math.random() * 10000).toString(16)

  var LOCALOFFER          = []
  var REQUESTED_FEEDS     = []
  var REMOTE_OFFER        = []
  var ACTIVE_FEED_STREAMS = {}
  
  // Open a virtual feed that has the key set to the shared key
  self._feed = stream.open(key, function onopen () {
    // ...
    self._handshakeExt.send({ client: MULTIFEED, version: PROTOCOL_VERSION, userData: opts.userData })
  })
  
  // PROTOCOL:
 
  // PEER A send OFFER
  // 1. CREATE & EMIT 'manifest', called by `multifeed: mux.ready(...)` AND `multifeed: _addFeed(...)`
  self.offerFeeds = function (keys, opts) { // opts = custom data for 'want' selections
    self._localOffer = [...self._localOffer, ...keys] // 2. REMEMBER LOCAL OFFER
    self._manifestExt.send({keys})
  }
   
  // PEER B receive OFFER
  self._manifestExt = extension(EXT_MANIFEST, msg => { // 3. 
    REMOTE_OFFER = [...REMOTE_OFFER, ...msg.keys] // 4. ADD REMOTE OFFER
    self.emit('manifest', msg, self.requestFeeds) // => triggers REQUEST
  })
 
  // PEER B send REQUEST
  // 5. Sends your wishlist to remote AND `mux.on('manifest', function onManifest(m) { mux.requestFeeds(m.keys) }`
  // for classical multifeed `ACCEPT_ALL` behaviour both must call `want(remoteHas)`
  self.requestFeeds = function (keys) {
    REQUESTED_FEEDS = [...REQUESTED_FEEDS, ...keys] // 6. REMEMBER REQUESTED FEEDS
    self._requestFeedsExt.send(keys) // only request new feeds
  }

  // PEER A - receive REQUEST
  self._requestFeedsExt = extension(EXT_REQUEST_FEEDS, keys => keys => { // 5a. by other PEER
      var filtered = uniq(keys.filter(key => {
      if (!~LOCALOFFER.indexOf(key)) return // got request for non-offered feed
      return true // All good, we accept the key request
    }))
    self._replicateFeedsExt.send(filtered) // Tell remote which keys we will replicate
    self._replicateFeeds(filtered) // Start replicating as promised
  ))
 
 
  // PEER B - receive REPLICATION OFFER
  self._replicateFeedsExt = extension(EXT_REPLICATE_FEEDS, keys => { // feeds
    var filtered = keys.filter( key => !~REQUESTED_FEEDS.indexOf(key))
    // Start replicating as requested.
    self._replicateFeeds(filtered, () => self.stream.emit('remote-feeds')) // 
  })


  // Initializes new replication streams for feeds and joins their streams into
  // the main stream.
  self._replicateFeeds = function (keys, cb) {

    self.emit('replicate', keys, once(startFeedReplication))

    return keys
    
    // PEER A + B
    function startFeedReplication (keys) {
      var feeds = keys
      var pending = feeds.length

      // only the feeds passed to `feeds` option will be replicated (sent or received)
      // hypercore-protocol has built in protection against receiving unexpected/not asked for data.

      feeds.forEach(feed => {
        var hexKey = feed.key.toString('hex')

        // prevent a feed from being folded into the main stream twice.
        if (typeof ACTIVE_FEED_STREAMS[hexKey] !== 'undefined') return (!--pending) ? cb() : void 0

        var fStream = feed.replicate(self._initiator, Object.assign({}, { // REPLICATE FEED
          live: opts.live, download: opts.download, upload: opts.upload, encrypt: opts.encrypt, stream: self.stream
        }))

        ACTIVE_FEED_STREAMS[hexKey] = fStream // Store reference to this particular feed stream

        function cleanup (_, res) { // delete feed stream reference
          if (ACTIVE_FEED_STREAMS[hexKey]) delete ACTIVE_FEED_STREAMS[hexKey]
        }
        fStream.once('end', cleanup)
        fStream.once('error', cleanup)

        if (!--pending) cb()
      })

      // Bail on replication entirely if there were no feeds to add, and none are pending or active.
      if (feeds.length === 0 && Object.keys(ACTIVE_FEED_STREAMS).length === 0) {
        debug('[REPLICATION] terminating mux: no feeds to sync')
        self._feed.close()
        process.nextTick(cb)
      }
    }
  }
}

multifeed - index

LOCAL PEER STORAGE

self._streams = [] // all peers in the form of `mux` objects

LOCAL FEED STORAGE

self._feeds = {
  [feedkey1]: feed1,
  [feedkey2]: feed2,
  [feedkey3]: feed3,
}

on READY offers all it's FEED KEYS to PEERS

mux.ready(function () {
  var keys = values(self._feeds).map(function (feed) { return feed.key.toString('hex') })
  mux.offerFeeds(keys)
})

stores locally and offers new FEED to PEERS

Multifeed.prototype._addFeed = function (feed, name) {
  self._feeds[name] = feed
  self._feedKeyToFeed[feed.key.toString('hex')] = feed
  feed.setMaxListeners(Infinity)
  self.emit('feed', feed, name)

  // forward live feed announcements
  if (!self._streams.length) return // no-op if no live-connections
  var hexKey = feed.key.toString('hex')
  // Tell each remote that we have a new key available unless it's already being replicated
  self._streams.forEach(function (mux) {
    if (!~mux.knownFeeds().indexOf(hexKey)) mux.offerFeeds([hexKey])
  })
}

add new peer (mux)

Multifeed.prototype.replicate = function (isInitiator, opts) {
  // All multifeeds get a random or passed in `_id`
  // When "ready" they make a feed using a default or passed encryptionKey and set it as `_root` feed

  // MAKE SESSION
  var mux = multiplexer(isInitiator, self._root.key, Object.assign({}, opts, {_id: this._id}))
  /* on ready */ self._streams.push(mux)

  // KEY EXCHANGE listener
  mux.on('manifest', function onManifest(m) { mux.requestFeeds(m.keys) }

  mux.on('replicate', function onReplicate(keys, done) { // REPLICATION REQUEST?
    await addMissingKeys(keys)
    // 1. make sure all keys are proper feedkeys
    // 2. check non of the given feeds already exist
    // for all new keys:
    //   1. make a storage with name "myKey" is "self._feeds.length"
    //   2. make new hypercore for the new feed
    //   3. `self._addFeed(feed, myKey)

    // => looks up all existing feeds based on given keys
    // => calls `mux` callback to replicate those feeds

    done(feed) // replicates
  })

  return mux.stream
}

from datdot-service.

serapath avatar serapath commented on August 23, 2024

proposal draft:
see datdotorg/datdot-research#17 (comment)

from datdot-service.

serapath avatar serapath commented on August 23, 2024

@okdistribute when checking i found cabal-core to use kappa-core and kappa-core to use multifeed, so it felt like underneath, that's whats happening on the level of exchanging hypercores.
can you recommend me where or what i should be looking at additionally?

from datdot-service.

okdistribute avatar okdistribute commented on August 23, 2024

Sorry I think I wasn't clear, the datastructure that handles the feeds in cabal is multifeed, and in hyperdrive, it's corestore. So any app built on multifeed or corestore could work with datdot, if you target those datastructures rather than a particular application like hyperdrive or cabal

from datdot-service.

Related Issues (15)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.