Skip to content
Snippets Groups Projects
simtransport.cc 6.64 KiB
Newer Older
Irene Y Zhang's avatar
Irene Y Zhang committed
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
 *
 * simtransport.cc:
 *   simulated message-passing interface for testing use
 *
 * Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
 *                     Naveen Kr. Sharma <naveenks@cs.washington.edu>
 *                     Dan R. K. Ports  <drkp@cs.washington.edu>
Irene Y Zhang's avatar
Irene Y Zhang committed
 *
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use, copy,
 * modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 *
 **********************************************************************/

#include "lib/assert.h"
#include "lib/message.h"
#include "lib/simtransport.h"
Irene Y Zhang's avatar
Irene Y Zhang committed
#include <google/protobuf/message.h>

SimulatedTransportAddress::SimulatedTransportAddress(int addr)
    : addr(addr)
{
    
}

int
SimulatedTransportAddress::GetAddr() const
{
    return addr;
}

SimulatedTransportAddress *
SimulatedTransportAddress::clone() const
{
    SimulatedTransportAddress *c = new SimulatedTransportAddress(addr);
    return c;
}

bool
SimulatedTransportAddress::operator==(const SimulatedTransportAddress &other) const
{
    return addr == other.addr;
}

SimulatedTransport::SimulatedTransport()
{
    lastAddr = -1;
    lastTimerId = 0;
    vtime = 0;
    processTimers = true;
}

SimulatedTransport::~SimulatedTransport()
{
    
}

void
SimulatedTransport::Register(TransportReceiver *receiver,
                             const transport::Configuration &config,
Irene Y Zhang's avatar
Irene Y Zhang committed
                             int replicaIdx)
{
    // Allocate an endpoint
    ++lastAddr;
    int addr = lastAddr;
    endpoints[addr] = receiver;

    // Tell the receiver its address
    receiver->SetAddress(new SimulatedTransportAddress(addr));

    RegisterConfiguration(receiver, config, replicaIdx);

    // If this is registered as a replica, record the index
    replicaIdxs[addr] = replicaIdx;
}

bool
SimulatedTransport::SendMessageInternal(TransportReceiver *src,
                                        const SimulatedTransportAddress &dstAddr,
                                        const Message &m,
                                        bool multicast)
{
    ASSERT(!multicast);
    
    int dst = dstAddr.addr;
    
    Message *msg = m.New();
    msg->CheckTypeAndMergeFrom(m);
    
    int srcAddr =
        dynamic_cast<const SimulatedTransportAddress &>(src->GetAddress()).addr;

    uint64_t delay = 0;
    for (auto f : filters) {
        if (!f.second(src, replicaIdxs[srcAddr],
                      endpoints[dst], replicaIdxs[dst],
                      *msg, delay)) {
            // Message dropped by filter
            // XXX Should we return failure?
            delete msg;
            return true;
        }
    }
                      
    string msgData;
    msg->SerializeToString(&msgData);
    delete msg;
    
    QueuedMessage q(dst, srcAddr, m.GetTypeName(), msgData);

    if (delay == 0) {
        queue.push_back(q);
    } else {
        Timer(delay, [=]() {
                queue.push_back(q);
            });
    }
    return true;    
}

SimulatedTransportAddress
SimulatedTransport::LookupAddress(const transport::Configuration &cfg,
Irene Y Zhang's avatar
Irene Y Zhang committed
                                  int idx)
{
    // Check every registered replica to see if its configuration and
    // idx match. This is the least efficient possible way to do this,
    // but that's why this is the simulated transport not the real
    // one... (And we only have to do this once at runtime.)
    

    for (auto & kv : configurations) {
        if (*(kv.second) == cfg) {
            // Configuration matches. Does the index?
            const SimulatedTransportAddress &addr =
                dynamic_cast<const SimulatedTransportAddress&>(kv.first->GetAddress());
            if (replicaIdxs[addr.addr] == idx) {
                // Matches.
                return addr;
            }
        }
    }
    
    Panic("No replica %d was registered", idx);
}

const SimulatedTransportAddress *
SimulatedTransport::LookupMulticastAddress(const transport::Configuration *cfg)
Irene Y Zhang's avatar
Irene Y Zhang committed
{
    return NULL;
}

void
SimulatedTransport::Run()
{
    LookupAddresses();
    
    do {
        // Process queue
        while (!queue.empty()) {
            QueuedMessage &q = queue.front();
            TransportReceiver *dst = endpoints[q.dst];
            dst->ReceiveMessage(SimulatedTransportAddress(q.src), q.type, q.msg);
            queue.pop_front();
        }

        // If there's a timer, deliver the earliest one only
        if (processTimers && !timers.empty()) {
            auto iter = timers.begin();
            ASSERT(iter->second.when >= vtime);
            vtime = iter->second.when;
            timer_callback_t cb = iter->second.cb;
            timers.erase(iter);
            cb();
        }
        
        // ...then retry to see if there are more queued messages to
        // deliver first
    } while (!queue.empty() || (processTimers && !timers.empty()));
}

void
SimulatedTransport::AddFilter(int id, filter_t filter)
{
    filters.insert(std::pair<int,filter_t>(id, filter));
}

void
SimulatedTransport::RemoveFilter(int id)
{
    filters.erase(id);
}


int
SimulatedTransport::Timer(uint64_t ms, timer_callback_t cb)
{
    ++lastTimerId;
    int id = lastTimerId;
    PendingTimer t;
    t.when = vtime + ms;
    t.cb = cb;
    t.id = id;
    timers.insert(std::pair<uint64_t, PendingTimer>(t.when, t));
    return id;
}

bool
SimulatedTransport::CancelTimer(int id)
{
    bool found = false;
    for (auto iter = timers.begin(); iter != timers.end();) {
        if (iter->second.id == id) {
            found = true;
            iter = timers.erase(iter);
        } else {
            iter++;
        }
    }
    
    return found;
}

void
SimulatedTransport::CancelAllTimers()
{
    timers.clear();
    processTimers = false;
}