// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
 *
 * transport-common.h:
 *   template support for implementing transports
 *
 * Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
 *                     Naveen Kr. Sharma <nksharma@cs.washington.edu>
 *                     Dan R. K. Ports  <drkp@cs.washington.edu>
 *
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use, copy,
 * modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 *
 **********************************************************************/

#ifndef _LIB_TRANSPORTCOMMON_H_
#define _LIB_TRANSPORTCOMMON_H_

#include "lib/assert.h"
#include "lib/configuration.h"
#include "lib/transport.h"

#include <map>
#include <unordered_map>

template <typename ADDR>
class TransportCommon : public Transport
{
    
public:
    TransportCommon() 
    {
        replicaAddressesInitialized = false;
    }

    virtual
    ~TransportCommon()
    {
        for (auto &kv : canonicalConfigs) {
            delete kv.second;
        }
    }
    
    virtual bool
    SendMessage(TransportReceiver *src, const TransportAddress &dst,
                const Message &m)
    {
        const ADDR &dstAddr = dynamic_cast<const ADDR &>(dst);
        return SendMessageInternal(src, dstAddr, m, false);
    }

    virtual bool
    SendMessageToReplica(TransportReceiver *src, int replicaIdx,
                         const Message &m)
    {
        const specpaxos::Configuration *cfg = configurations[src];
        ASSERT(cfg != NULL);

        if (!replicaAddressesInitialized) {
            LookupAddresses();
        }
        
        auto kv = replicaAddresses[cfg].find(replicaIdx);
        ASSERT(kv != replicaAddresses[cfg].end());
        
        return SendMessageInternal(src, kv->second, m, false);
    }

    virtual bool
    SendMessageToAll(TransportReceiver *src, const Message &m)
    {
        const specpaxos::Configuration *cfg = configurations[src];
        ASSERT(cfg != NULL);

        if (!replicaAddressesInitialized) {
            LookupAddresses();
        }

        auto kv = multicastAddresses.find(cfg);
        if (kv != multicastAddresses.end()) {
            // Send by multicast if we can
            return SendMessageInternal(src, kv->second, m, true);
        } else {
            // ...or by individual messages to every replica if not
            const ADDR &srcAddr = dynamic_cast<const ADDR &>(src->GetAddress());
            for (auto & kv2 : replicaAddresses[cfg]) {
                if (srcAddr == kv2.second) {
                    continue;
                }
                if (!SendMessageInternal(src, kv2.second, m, false)) {
                    return false;
                }
            }
            return true;
        }
    }
    
protected:
    virtual bool SendMessageInternal(TransportReceiver *src,
                                     const ADDR &dst,
                                     const Message &m,
                                     bool multicast = false) = 0;
    virtual ADDR LookupAddress(const specpaxos::Configuration &cfg,
                               int replicaIdx) = 0;
    virtual const ADDR *
    LookupMulticastAddress(const specpaxos::Configuration *cfg) = 0;

    std::unordered_map<specpaxos::Configuration,
                       specpaxos::Configuration *> canonicalConfigs;
    std::map<TransportReceiver *,
             specpaxos::Configuration *> configurations;
    std::map<const specpaxos::Configuration *,
             std::map<int, ADDR> > replicaAddresses;
    std::map<const specpaxos::Configuration *,
             std::map<int, TransportReceiver *> > replicaReceivers;
    std::map<const specpaxos::Configuration *, ADDR> multicastAddresses;
    bool replicaAddressesInitialized;

    virtual specpaxos::Configuration *
    RegisterConfiguration(TransportReceiver *receiver,
                          const specpaxos::Configuration &config,
                          int replicaIdx)
    {
        ASSERT(receiver != NULL);

        // Have we seen this configuration before? If so, get a
        // pointer to the canonical copy; if not, create one. This
        // allows us to use that pointer as a key in various
        // structures. 
        specpaxos::Configuration *canonical
            = canonicalConfigs[config];
        if (canonical == NULL) {
            canonical = new specpaxos::Configuration(config);
            canonicalConfigs[config] = canonical;
        }

        // Record configuration
        configurations.insert(std::make_pair(receiver, canonical));

        // If this is a replica, record the receiver
        if (replicaIdx != -1) {
            replicaReceivers[canonical].insert(std::make_pair(replicaIdx,
                                                              receiver));
        }

        // Mark replicaAddreses as uninitalized so we'll look up
        // replica addresses again the next time we send a message.
        replicaAddressesInitialized = false;

        return canonical;
    }

    virtual void
    LookupAddresses()
    {
        // Clear any existing list of addresses
        replicaAddresses.clear();
        multicastAddresses.clear();

        // For every configuration, look up all addresses and cache
        // them.
        for (auto &kv : canonicalConfigs) {
            specpaxos::Configuration *cfg = kv.second;

            for (int i = 0; i < cfg->n; i++) {
                const ADDR addr = LookupAddress(*cfg, i);
                replicaAddresses[cfg].insert(std::make_pair(i, addr));
            }

            // And check if there's a multicast address
            if (cfg->multicast()) {
                const ADDR *addr = LookupMulticastAddress(cfg);
                if (addr) {
                    multicastAddresses.insert(std::make_pair(cfg, *addr));
                    delete addr;
                }
            }
        }
        
        replicaAddressesInitialized = true;
    }
};

#endif // _LIB_TRANSPORTCOMMON_H_