service.go 5.75 KB
Newer Older
Weixin Deng's avatar
Weixin Deng committed
1
2
3
package main

import (
Weixin Deng's avatar
Weixin Deng committed
4
	"encoding/json"
Weixin Deng's avatar
Weixin Deng committed
5
6
7
8
9
	"flag"
	"fmt"
	"log"
	"net"
	"net/rpc"
Weixin Deng's avatar
Weixin Deng committed
10
	"strconv"
11
	"sync"
Weixin Deng's avatar
Weixin Deng committed
12
	"time"
Weixin Deng's avatar
Weixin Deng committed
13
14
)

15
func Assert(assert bool, msg ...any) {
Weixin Deng's avatar
Weixin Deng committed
16
	if !assert {
17
		log.Fatal(msg...)
Weixin Deng's avatar
Weixin Deng committed
18
19
20
	}
}

Weixin Deng's avatar
Weixin Deng committed
21
22
23
24
25
26
func setTimeZone() {
	loc, err := time.LoadLocation("US/Pacific")
	Assert(err == nil, err)
	time.Local = loc
}

Weixin Deng's avatar
Weixin Deng committed
27
28
29
30
31
32
type Args struct {
	startServer bool
	startClient bool
	addr        string
	numPorts    int
	ports       []string
Weixin Deng's avatar
Weixin Deng committed
33
	numBackends int
Weixin Deng's avatar
Weixin Deng committed
34
35
36
37
38
39
40
41
42
43
	numServers  int
	servers     []string
	dataSize    int
	serverDelay int
	repeat      int
}

func parseArgs() Args {
	var startServer = flag.Bool("server", false, "Start server")
	var startClient = flag.Bool("client", false, "Start client")
Weixin Deng's avatar
Weixin Deng committed
44
	var addr = flag.String("addr", "", "Address")
Weixin Deng's avatar
Weixin Deng committed
45
	var numPorts = flag.Int("numPorts", 0, "Number of ports")
Weixin Deng's avatar
Weixin Deng committed
46
	var numBackends = flag.Int("numBackends", 0, "Number of backends")
Weixin Deng's avatar
Weixin Deng committed
47
48
49
50
	var dataSize = flag.Int("dataSize", 1, "Data size")
	var serverDelay = flag.Int("serverDelay", 0, "Server delay in microseconds")
	var repeat = flag.Int("repeat", 1, "Repeat")
	flag.Parse()
Weixin Deng's avatar
Weixin Deng committed
51
52
	Assert(len(*addr) > 0)
	numServers := *numPorts * *numBackends
Weixin Deng's avatar
Weixin Deng committed
53
54
55
56
57
	args := Args{
		startServer: *startServer,
		startClient: *startClient,
		addr:        *addr,
		numPorts:    *numPorts,
Weixin Deng's avatar
Weixin Deng committed
58
		numBackends: *numBackends,
Weixin Deng's avatar
Weixin Deng committed
59
		numServers:  numServers,
Weixin Deng's avatar
Weixin Deng committed
60
61
62
63
		dataSize:    *dataSize,
		serverDelay: *serverDelay,
		repeat:      *repeat,
	}
Weixin Deng's avatar
Weixin Deng committed
64
65
66
67
	var ports []string
	for i := 0; i < *numPorts; i++ {
		port := strconv.Itoa(9000 + i + 1)
		ports = append(ports, port)
Weixin Deng's avatar
Weixin Deng committed
68
	}
Weixin Deng's avatar
Weixin Deng committed
69
70
71
72
73
74
75
	args.ports = ports
	var servers []string
	for i := 0; i < *numBackends; i++ {
		backendAddr := "10.0.0." + strconv.Itoa(i+2)
		for j := 0; j < *numPorts; j++ {
			serverAddr := backendAddr + ":" + args.ports[j]
			servers = append(servers, serverAddr)
Weixin Deng's avatar
Weixin Deng committed
76
77
		}
	}
Weixin Deng's avatar
Weixin Deng committed
78
	args.servers = servers
Weixin Deng's avatar
Weixin Deng committed
79
80
81
	return args
}

Weixin Deng's avatar
Weixin Deng committed
82
type Request struct {
Weixin Deng's avatar
Weixin Deng committed
83
84
85
86
	srcAddr     string
	dstAddr     string
	DataSize    int
	ServerDelay time.Duration
Weixin Deng's avatar
Weixin Deng committed
87
88
89
}

type Reply struct {
Weixin Deng's avatar
Weixin Deng committed
90
91
92
93
	srcAddr     string
	dstAddr     string
	Data        []byte
	ServerDelay time.Duration
Weixin Deng's avatar
Weixin Deng committed
94
95
}

Weixin Deng's avatar
Weixin Deng committed
96
type Clock struct {
97
98
99
100
101
	Start    time.Time
	End      time.Time
	Duration time.Duration
}

Weixin Deng's avatar
Weixin Deng committed
102
func (t *Clock) Record() {
103
104
105
	t.Start = time.Now()
}

Weixin Deng's avatar
Weixin Deng committed
106
func (t *Clock) Stop() {
107
108
109
110
	t.End = time.Now()
	t.Duration = t.End.Sub(t.Start)
}

Weixin Deng's avatar
Weixin Deng committed
111
type Client struct {
Weixin Deng's avatar
Weixin Deng committed
112
	addr string
Weixin Deng's avatar
Weixin Deng committed
113
114
}

115
116
117
118
119
120
121
func (c *Client) SendRequest(
	request Request,
	clock *Clock,
	wgInit *sync.WaitGroup,
	wgStart *sync.WaitGroup,
	wgFinish *sync.WaitGroup) {
	defer wgFinish.Done()
Weixin Deng's avatar
Weixin Deng committed
122
	client, err := rpc.Dial("tcp", request.dstAddr)
123
	Assert(err == nil, "[Client error] ", err)
Weixin Deng's avatar
Weixin Deng committed
124
	defer client.Close()
125
	var reply Reply
126
127
	wgInit.Done()
	wgStart.Wait()
Weixin Deng's avatar
Weixin Deng committed
128
	clock.Record()
129
	err = client.Call("Server.HandleRequest", request, &reply)
130
	Assert(err == nil, "[Client error] ", err)
Weixin Deng's avatar
Weixin Deng committed
131
	clock.Stop()
Weixin Deng's avatar
Weixin Deng committed
132
133
134
135
136
137
}

type Server struct {
	addr     string
	server   *rpc.Server
	listener net.Listener
Weixin Deng's avatar
Weixin Deng committed
138
	data     []byte
Weixin Deng's avatar
Weixin Deng committed
139
140
}

Weixin Deng's avatar
Weixin Deng committed
141
func (s *Server) Start() {
Weixin Deng's avatar
Weixin Deng committed
142
143
144
	s.server = rpc.NewServer()
	s.server.Register(s)
	listener, err := net.Listen("tcp", s.addr)
145
	Assert(err == nil, "[Server error] ", err)
Weixin Deng's avatar
Weixin Deng committed
146
	s.listener = listener
Weixin Deng's avatar
Weixin Deng committed
147
148
149
150
	maxSize := 1 << 20
	for i := 0; i < maxSize; i++ {
		s.data = append(s.data, '0')
	}
151
	fmt.Printf("Listening to %s\n", s.addr)
Weixin Deng's avatar
Weixin Deng committed
152
	s.server.Accept(listener)
Weixin Deng's avatar
Weixin Deng committed
153
154
155
}

func (s *Server) HandleRequest(request Request, reply *Reply) error {
Weixin Deng's avatar
Weixin Deng committed
156
157
	var clock Clock
	clock.Record()
Weixin Deng's avatar
Weixin Deng committed
158
159
160
161
162
163
	*reply = Reply{
		srcAddr: request.dstAddr,
		dstAddr: request.srcAddr,
		Data:    s.data[:request.DataSize],
	}
	time.Sleep(request.ServerDelay)
Weixin Deng's avatar
Weixin Deng committed
164
165
	clock.Stop()
	reply.ServerDelay = clock.Duration
Weixin Deng's avatar
Weixin Deng committed
166
167
168
	return nil
}

Weixin Deng's avatar
Weixin Deng committed
169
170
171
func startClient(args *Args) {
	client := Client{
		addr: args.addr,
Weixin Deng's avatar
Weixin Deng committed
172
	}
Weixin Deng's avatar
Weixin Deng committed
173
174
175
176
	request := Request{
		srcAddr:     client.addr,
		DataSize:    args.dataSize,
		ServerDelay: time.Duration(args.serverDelay) * time.Microsecond,
177
	}
Weixin Deng's avatar
Weixin Deng committed
178
	clocks := make([]Clock, args.numServers)
179
180
181
182
	var wgInit sync.WaitGroup
	var wgStart sync.WaitGroup
	var wgFinish sync.WaitGroup
	wgStart.Add(1)
Weixin Deng's avatar
Weixin Deng committed
183
	for i := 0; i < args.numServers; i++ {
184
185
		wgInit.Add(1)
		wgFinish.Add(1)
Weixin Deng's avatar
Weixin Deng committed
186
		request.dstAddr = args.servers[i]
187
		go client.SendRequest(request, &clocks[i], &wgInit, &wgStart, &wgFinish)
Weixin Deng's avatar
Weixin Deng committed
188
	}
189
190
191
	wgInit.Wait()
	wgStart.Done()
	wgFinish.Wait()
Weixin Deng's avatar
Weixin Deng committed
192
193
194
195
196
197
198
199
	var start time.Time
	var end time.Time
	var delayAvg time.Duration
	var delayTotal time.Duration
	for i := 0; i < args.numServers; i++ {
		clock := clocks[i]
		if start.IsZero() || clock.Start.Before(start) {
			start = clock.Start
200
		}
Weixin Deng's avatar
Weixin Deng committed
201
202
203
204
205
206
207
208
209
		if end.IsZero() || clock.End.After(end) {
			end = clock.End
		}
		delayAvg += clock.Duration
	}
	delayAvg /= time.Duration(args.numServers)
	delayTotal = end.Sub(start)
	dataSizeTotal := args.dataSize * 8 * args.numServers
	throughput := int64(dataSizeTotal) * 1e9 / delayTotal.Nanoseconds()
Weixin Deng's avatar
Weixin Deng committed
210
	fmt.Println()
Weixin Deng's avatar
Weixin Deng committed
211
212
213
214
	fmt.Println("DstAddr,Start,End,Duration,DurationUS")
	for i := 0; i < args.numServers; i++ {
		dstAddr := args.servers[i]
		clock := clocks[i]
Weixin Deng's avatar
Weixin Deng committed
215
216
217
218
219
220
221
222
		fmt.Printf(
			"%s,%s,%s,%s,%d\n",
			dstAddr,
			clock.Start.Format("15:04:05.000000000"),
			clock.End.Format("15:04:05.000000000"),
			clock.Duration,
			clock.Duration.Microseconds(),
		)
Weixin Deng's avatar
Weixin Deng committed
223
	}
Weixin Deng's avatar
Weixin Deng committed
224
225
226
227
228
229
230
231
232
233
234
235
236
	fmt.Printf("DelayAvg: %s\n", delayAvg)
	fmt.Printf("DelayTotal: %s\n", delayTotal)
	fmt.Printf("Throughput: %dbps\n", throughput)
	time.Sleep(1 * time.Second)
}

func startServer(args *Args) {
	var wg sync.WaitGroup
	wg.Add(1)
	for i := 0; i < args.numPorts; i++ {
		addr := args.addr + ":" + args.ports[i]
		server := Server{
			addr: addr,
Weixin Deng's avatar
Weixin Deng committed
237
		}
Weixin Deng's avatar
Weixin Deng committed
238
239
240
241
242
243
		go server.Start()
	}
	wg.Wait()
}

func main() {
Weixin Deng's avatar
Weixin Deng committed
244
	setTimeZone()
Weixin Deng's avatar
Weixin Deng committed
245
246
	args := parseArgs()
	if args.startClient {
Weixin Deng's avatar
Weixin Deng committed
247
248
249
250
251
252
253
254
255
		params := make(map[string]any)
		params["Ports"] = args.numPorts
		params["Backends"] = args.numBackends
		params["Servers"] = args.numServers
		params["DataSize"] = args.dataSize
		params["ServerDelay"] = args.serverDelay
		paramsJson, err := json.MarshalIndent(params, "", "\t")
		Assert(err == nil, err)
		fmt.Println(string(paramsJson))
Weixin Deng's avatar
Weixin Deng committed
256
257
		for i := 0; i < args.repeat; i++ {
			startClient(&args)
Weixin Deng's avatar
Weixin Deng committed
258
259
		}
	}
Weixin Deng's avatar
Weixin Deng committed
260
261
262
	if args.startServer {
		startServer(&args)
	}
Weixin Deng's avatar
Weixin Deng committed
263
}