Commit 884bdad4 authored by Weixin Deng's avatar Weixin Deng
Browse files

(WIP) Try simple homa impl

parent b847b81d
......@@ -80,15 +80,15 @@ func parseArgs() Args {
}
type Request struct {
srcAddr string
dstAddr string
SrcAddr string
DstAddr string
DataSize int
ServerDelay time.Duration
}
type Reply struct {
srcAddr string
dstAddr string
SrcAddr string
DstAddr string
Data []byte
ServerDelay time.Duration
}
......@@ -119,7 +119,7 @@ func (c *Client) SendRequest(
wgStart *sync.WaitGroup,
wgFinish *sync.WaitGroup) {
defer wgFinish.Done()
client, err := rpc.Dial("tcp", request.dstAddr)
client, err := rpc.Dial("tcp", request.DstAddr)
Assert(err == nil, "[Client error] ", err)
defer client.Close()
var reply Reply
......@@ -156,8 +156,8 @@ func (s *Server) HandleRequest(request Request, reply *Reply) error {
var clock Clock
clock.Record()
*reply = Reply{
srcAddr: request.dstAddr,
dstAddr: request.srcAddr,
SrcAddr: request.DstAddr,
DstAddr: request.SrcAddr,
Data: s.data[:request.DataSize],
}
time.Sleep(request.ServerDelay)
......@@ -171,7 +171,7 @@ func startClient(args *Args) {
addr: args.addr,
}
request := Request{
srcAddr: client.addr,
SrcAddr: client.addr,
DataSize: args.dataSize,
ServerDelay: time.Duration(args.serverDelay) * time.Microsecond,
}
......@@ -183,7 +183,7 @@ func startClient(args *Args) {
for i := 0; i < args.numServers; i++ {
wgInit.Add(1)
wgFinish.Add(1)
request.dstAddr = args.servers[i]
request.DstAddr = args.servers[i]
go client.SendRequest(request, &clocks[i], &wgInit, &wgStart, &wgFinish)
}
wgInit.Wait()
......
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"net"
"net/rpc"
"strconv"
"sync"
"time"
)
func Assert(assert bool, msg ...any) {
if !assert {
log.Fatal(msg...)
}
}
func setTimeZone() {
loc, err := time.LoadLocation("US/Pacific")
Assert(err == nil, err)
time.Local = loc
}
type Args struct {
startServer bool
startClient bool
addr string
numPorts int
ports []string
numBackends int
numServers int
servers []string
dataSize int
serverDelay int
repeat int
}
func parseArgs() Args {
var startServer = flag.Bool("server", false, "Start server")
var startClient = flag.Bool("client", false, "Start client")
var addr = flag.String("addr", "", "Address")
var numPorts = flag.Int("numPorts", 0, "Number of ports")
var numBackends = flag.Int("numBackends", 0, "Number of backends")
var dataSize = flag.Int("dataSize", 1, "Data size")
var serverDelay = flag.Int("serverDelay", 0, "Server delay in microseconds")
var repeat = flag.Int("repeat", 1, "Repeat")
flag.Parse()
Assert(len(*addr) > 0)
numServers := *numPorts * *numBackends
args := Args{
startServer: *startServer,
startClient: *startClient,
addr: *addr,
numPorts: *numPorts,
numBackends: *numBackends,
numServers: numServers,
dataSize: *dataSize,
serverDelay: *serverDelay,
repeat: *repeat,
}
var ports []string
for i := 0; i < *numPorts; i++ {
port := strconv.Itoa(9000 + i + 1)
ports = append(ports, port)
}
args.ports = ports
var servers []string
for i := 0; i < *numBackends; i++ {
backendAddr := "10.0.0." + strconv.Itoa(i+2)
for j := 0; j < *numPorts; j++ {
serverAddr := backendAddr + ":" + args.ports[j]
servers = append(servers, serverAddr)
}
}
args.servers = servers
return args
}
type Request struct {
SrcAddr string
DstAddr string
DataSize int
ServerDelay time.Duration
}
type Reply struct {
SrcAddr string
DstAddr string
Data []byte
ServerDelay time.Duration
}
type Clock struct {
Start time.Time
End time.Time
Duration time.Duration
}
func (t *Clock) Record() {
t.Start = time.Now()
}
func (t *Clock) Stop() {
t.End = time.Now()
t.Duration = t.End.Sub(t.Start)
}
type Client struct {
addr string
}
func (c *Client) SendRequest(
request Request,
clock *Clock,
wgInit *sync.WaitGroup,
wgStart *sync.WaitGroup,
wgFinish *sync.WaitGroup) {
defer wgFinish.Done()
client, err := rpc.Dial("tcp", request.DstAddr)
Assert(err == nil, "[Client error] ", err)
defer client.Close()
var reply Reply
wgInit.Done()
wgStart.Wait()
clock.Record()
err = client.Call("Server.HandleRequest", request, &reply)
Assert(err == nil, "[Client error] ", err)
clock.Stop()
}
type Server struct {
addr string
server *rpc.Server
listener net.Listener
data []byte
}
func (s *Server) Start() {
s.server = rpc.NewServer()
s.server.Register(s)
listener, err := net.Listen("tcp", s.addr)
Assert(err == nil, "[Server error] ", err)
s.listener = listener
maxSize := 1 << 20
for i := 0; i < maxSize; i++ {
s.data = append(s.data, '0')
}
fmt.Printf("Listening to %s\n", s.addr)
s.server.Accept(listener)
}
func (s *Server) HandleRequest(request Request, reply *Reply) error {
var clock Clock
clock.Record()
*reply = Reply{
SrcAddr: request.DstAddr,
DstAddr: request.SrcAddr,
Data: s.data[:request.DataSize],
}
time.Sleep(request.ServerDelay)
clock.Stop()
reply.ServerDelay = clock.Duration
return nil
}
func startClient(args *Args) {
client := Client{
addr: args.addr,
}
request := Request{
SrcAddr: client.addr,
DataSize: args.dataSize,
ServerDelay: time.Duration(args.serverDelay) * time.Microsecond,
}
clocks := make([]Clock, args.numServers)
var wgInit sync.WaitGroup
var wgStart sync.WaitGroup
var wgFinish sync.WaitGroup
wgStart.Add(1)
for i := 0; i < args.numServers; i++ {
wgInit.Add(1)
wgFinish.Add(1)
request.DstAddr = args.servers[i]
go client.SendRequest(request, &clocks[i], &wgInit, &wgStart, &wgFinish)
}
wgInit.Wait()
wgStart.Done()
wgFinish.Wait()
var start time.Time
var end time.Time
var delayAvg time.Duration
var delayTotal time.Duration
for i := 0; i < args.numServers; i++ {
clock := clocks[i]
if start.IsZero() || clock.Start.Before(start) {
start = clock.Start
}
if end.IsZero() || clock.End.After(end) {
end = clock.End
}
delayAvg += clock.Duration
}
delayAvg /= time.Duration(args.numServers)
delayTotal = end.Sub(start)
dataSizeTotal := args.dataSize * 8 * args.numServers
throughput := int64(dataSizeTotal) * 1e9 / delayTotal.Nanoseconds()
fmt.Println()
fmt.Println("DstAddr,Start,End,Duration,DurationUS")
for i := 0; i < args.numServers; i++ {
dstAddr := args.servers[i]
clock := clocks[i]
fmt.Printf(
"%s,%s,%s,%s,%d\n",
dstAddr,
clock.Start.Format("15:04:05.000000000"),
clock.End.Format("15:04:05.000000000"),
clock.Duration,
clock.Duration.Microseconds(),
)
}
fmt.Printf("DelayAvg: %s\n", delayAvg)
fmt.Printf("DelayTotal: %s\n", delayTotal)
fmt.Printf("Throughput: %dbps\n", throughput)
time.Sleep(1 * time.Second)
}
func startServer(args *Args) {
var wg sync.WaitGroup
wg.Add(1)
for i := 0; i < args.numPorts; i++ {
addr := args.addr + ":" + args.ports[i]
server := Server{
addr: addr,
}
go server.Start()
}
wg.Wait()
}
func main() {
setTimeZone()
args := parseArgs()
if args.startClient {
params := make(map[string]any)
params["Ports"] = args.numPorts
params["Backends"] = args.numBackends
params["Servers"] = args.numServers
params["DataSize"] = args.dataSize
params["ServerDelay"] = args.serverDelay
paramsJson, err := json.MarshalIndent(params, "", "\t")
Assert(err == nil, err)
fmt.Println(string(paramsJson))
for i := 0; i < args.repeat; i++ {
startClient(&args)
}
}
if args.startServer {
startServer(&args)
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment