Commit f2e36ee2 authored by Weixin Deng's avatar Weixin Deng
Browse files

(WIP) Add simple homa

parent f7963756
go run ../src/service.go \
go run ../src/service_homa.go \
-client \
-addr "10.0.0.1" \
-numPorts 8 \
......
......@@ -15,7 +15,7 @@ class ClientConfig:
def generate_cmd(config: ClientConfig):
cmd = [
"go run ../src/service.go",
"go run ../src/service_homa.go",
"-client",
'-addr "10.0.0.1"',
f"-numPorts {config.num_ports}",
......
go run ../src/service.go \
go run ../src/service_homa.go \
-server \
-addr "10.0.0.2" \
-numPorts 64
......@@ -34,6 +34,7 @@ type Args struct {
numServers int
servers []string
dataSize int
rttSize int
serverDelay int
repeat int
}
......@@ -45,6 +46,7 @@ func parseArgs() Args {
var numPorts = flag.Int("numPorts", 0, "Number of ports")
var numBackends = flag.Int("numBackends", 0, "Number of backends")
var dataSize = flag.Int("dataSize", 1, "Data size")
var rttSize = flag.Int("rttSize", 16, "RTT size")
var serverDelay = flag.Int("serverDelay", 0, "Server delay in microseconds")
var repeat = flag.Int("repeat", 1, "Repeat")
flag.Parse()
......@@ -58,6 +60,7 @@ func parseArgs() Args {
numBackends: *numBackends,
numServers: numServers,
dataSize: *dataSize,
rttSize: *rttSize,
serverDelay: *serverDelay,
repeat: *repeat,
}
......@@ -84,6 +87,7 @@ type Request struct {
DstAddr string
DataSize int
ServerDelay time.Duration
AckSize int
}
type Reply struct {
......@@ -91,6 +95,7 @@ type Reply struct {
DstAddr string
Data []byte
ServerDelay time.Duration
AckSize int
}
type Clock struct {
......@@ -119,15 +124,22 @@ func (c *Client) SendRequest(
wgStart *sync.WaitGroup,
wgFinish *sync.WaitGroup) {
defer wgFinish.Done()
client, err := rpc.Dial("tcp", request.DstAddr)
conn, err := net.DialTimeout("tcp", request.DstAddr, time.Hour)
Assert(err == nil, "[Client error] ", err)
client := rpc.NewClient(conn)
defer client.Close()
var reply Reply
wgInit.Done()
wgStart.Wait()
clock.Record()
err = client.Call("Server.HandleRequest", request, &reply)
Assert(err == nil, "[Client error] ", err)
for {
var reply Reply
err = client.Call("Server.HandleRequest", request, &reply)
Assert(err == nil, "[Client error] ", err)
if reply.AckSize >= request.DataSize {
break
}
request.AckSize = reply.AckSize
}
clock.Stop()
}
......@@ -136,6 +148,7 @@ type Server struct {
server *rpc.Server
listener net.Listener
data []byte
rttSize int
}
func (s *Server) Start() {
......@@ -155,10 +168,15 @@ func (s *Server) Start() {
func (s *Server) HandleRequest(request Request, reply *Reply) error {
var clock Clock
clock.Record()
ackSize := request.AckSize + s.rttSize
if ackSize > request.DataSize {
ackSize = request.DataSize
}
*reply = Reply{
SrcAddr: request.DstAddr,
DstAddr: request.SrcAddr,
Data: s.data[:request.DataSize],
Data: s.data[request.AckSize:ackSize],
AckSize: ackSize,
}
time.Sleep(request.ServerDelay)
clock.Stop()
......@@ -174,6 +192,7 @@ func startClient(args *Args) {
SrcAddr: client.addr,
DataSize: args.dataSize,
ServerDelay: time.Duration(args.serverDelay) * time.Microsecond,
AckSize: 0,
}
clocks := make([]Clock, args.numServers)
var wgInit sync.WaitGroup
......@@ -233,7 +252,8 @@ func startServer(args *Args) {
for i := 0; i < args.numPorts; i++ {
addr := args.addr + ":" + args.ports[i]
server := Server{
addr: addr,
addr: addr,
rttSize: args.rttSize,
}
go server.Start()
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment