Contents

간단한 로드밸런서와 HTTP 프록시 서버 구현

레이트 리미터

Limiter interface

package limiter

type Limiter interface {
	TryTake([]byte) (bool, int)
}

레이트 리미터 객체가 구현해야할 인터페이스로 Limiter 인터페이스가 있습니다. 바이트 슬라이스를 받아서 해당 슬라이스를 기반으로 레이트 리밋을 계산하여 이번 요청이 사용 가능하면 true와 적절한 status code를 반환합니다.

slide count struct

type SlideCount struct {
	lock       *lock.Lock
	unit       int64
	maxConnPer float64
	prevTime   int64
	prevCount  int64
	curCount   int64
	nextTime   int64
}

func New(maxConnPer float64, unit time.Duration) limiter.Limiter {
	now := int64(time.Now().UnixNano())
	return &SlideCount{
		lock:       new(lock.Lock),
		unit:       int64(unit),
		maxConnPer: maxConnPer,
		prevTime:   now - int64(unit),
		prevCount:  0,
		curCount:   0,
		nextTime:   now + int64(unit),
	}
}

슬라이드 카운트 구조체는 sliding window count 방식의 레이트 리밋을 구현한 것입니다. 일반적인 케이스와 달리 제 주관적인 해석이 들어가 있으므로 코드가 좀 다릅니다.

일반적인 슬라이딩 윈도우 카운트와 동일한 아이디어를 차용했지만 저는 아예 명시적으로 단위 시간과 윈도우 크기를 정해놓고 좀 더 명확하게 구간을 움직이는 방식으로 코드를 작성했습니다.

그렇기에 이전 시간, 이전 단위 시간 당 연결 수, 현재 연결 수, 다음 시간을 의미하는 구조체 멤버가 존재합니다.

최대 연결 수(maxConnPer)는 단위 시간 당 최대 허용 연결 수입니다.

TryTake

func (s *SlideCount) TryTake(_ []byte) (bool, int) {
	s.lock.Lock()
	defer s.lock.Unlock()

	now := int64(time.Now().UnixNano())

	if now < s.prevTime {
		return false, 0
	}

	for now > s.nextTime {
		s.prevTime = s.nextTime - s.unit
		s.prevCount = s.curCount
		s.curCount = 0
		s.nextTime = s.nextTime + s.unit
	}

	req := float64(s.prevCount)*float64(-now+s.prevTime+2*s.unit)/float64(s.unit) + float64(s.curCount+1)
	if req > s.maxConnPer {
		return false, 0
	}

	s.curCount++

	return true, 0
}

슬라이딩 윈도우 카운트 방식에선 바이트 슬라이스를 받을 필요가 없기 때문에 언더바를 이용하여 매개변수를 처리하였습니다.

구조체 멤버를 많이 생성해 놓았기 때문에 코드는 비교적 간결한 편이라고 생각합니다. 먼저 현재 시간을 구한 후 매우 과거인 경우(now < s.prevTime)인 경우 잘못된 요청으로 처리하고 false를 반환합니다.

그리고 단위 시간보다 훨씬 많이 기준 시간을 벗어났을 경우 지속적으로 갱신시켜주어 슬라이딩 윈도우의 위치를 맞춰줍니다.

마지막으로 이전 시간에서 일정 비율과 현재 연결 누적 수를 계산하여 최대 연결 수를 초과하였는지 확인하고 결과를 반환합니다.

AccessControlCount

type AccessControlCount struct {
	lock       *lock.Lock
	list       map[string]limiter.Limiter
	maxConnPer float64
	unit       time.Duration
}

func New(maxConnPer float64, unit time.Duration) limiter.Limiter {
	return &AccessControlCount{
		lock:       new(lock.Lock),
		list:       nil,
		maxConnPer: maxConnPer,
		unit:       unit,
	}
}

이 구조체는 슬라이딩 윈도우를 확장하여 만든 각 클라이언트 별 레이트 리미터입니다.

TryTake

func (acc *AccessControlCount) TryTake(key []byte) (bool, int) {
	acc.lock.Lock()
	defer acc.lock.Unlock()

	if acc.list == nil {
		acc.list = make(map[string]limiter.Limiter)
	}

	slide, ok := acc.list[string(key)]
	if !ok {
		slide = slide_count.New(acc.maxConnPer, acc.unit)
		acc.list[string(key)] = slide
	}

	return slide.TryTake(nil)
}

매개변수의 바이트 슬라이스를 가지고 맵에서 특정 레이트 리미터를 꺼내와서 연산한 후 결과를 반환합니다.

로드 밸런서

balancer interface

package balancer

type Balancer interface {
	Add(string) error
	Sub(string) error
	Get(string) (string, error)
	Restore(string) error
}

balancer 인터페이스는 대상 값을 추가, 삭제하고 적절한 로직에 따라 값을 받고 돌려주는 메서드를 구현하도록 합니다.

least balancer

package least

import (
	"math"

	"github.com/diy-cloud/virtual-gate/balancer"
	"github.com/diy-cloud/virtual-gate/lock"
)

type Least struct {
	candidates map[string]int64
	l          *lock.Lock
}

func New() balancer.Balancer {
	return &Least{
		candidates: make(map[string]int64),
		l:          new(lock.Lock),
	}
}

least 패키지는 최소 연결 방식 로드밸런서를 모방합니다. 후보군을 맵에 저장하여 가지고 있습니다.

func (l *Least) Get(_ string) (string, error) {
	l.l.Lock()
	defer l.l.Unlock()

	min := int64(math.MaxInt64)
	target := ""
	for k, v := range l.candidates {
		if v < min {
			target = k
			min = v
		}
	}

	if target == "" {
		return "", balancer.ErrorAnythingNotExist()
	}

	l.candidates[target]++
	return target, nil
}

최소 연결 방식에 따라 맵을 순회하며 가장 적은 연결 수를 가진 후보를 선택하여 반환합니다.

func (l *Least) Restore(target string) error {
	l.l.Lock()
	defer l.l.Unlock()

	if _, ok := l.candidates[target]; !ok {
		return balancer.ErrorValueIsNotExist()
	}

	l.candidates[target]--
	return nil
}

최소 연결 방식은 연결 수를 저장하고 있어야 하기에 모든 작업이 끝날 경우 Restore 메서드를 호출하여 연결 수를 줄여주어야합니다.

hash balancer

package hashed

import (
	"hash"

	"github.com/diy-cloud/virtual-gate/balancer"
	"github.com/diy-cloud/virtual-gate/lock"
)

type Hashed struct {
	candidates []string
	hasher     hash.Hash64
	index      int
	l          *lock.Lock
}

func New(hasher hash.Hash64) balancer.Balancer {
	return &Hashed{
		candidates: make([]string, 0, 8),
		hasher:     hasher,
		l:          new(lock.Lock),
	}
}

hashed 패키지는 사용자의 정보를 해싱하여 나온 값을 기반으로 후보군에서 후보를 선택하는 방식입니다. 그래서 hasher 멤버를 가지며 후보군을 문자열 슬라이스로 가집니다.

func (h *Hashed) Get(id string) (string, error) {
	h.l.Lock()
	defer h.l.Unlock()

	h.hasher.Reset()
	h.hasher.Write([]byte(id))
	hashedIndex := int(h.hasher.Sum64() % uint64(len(h.candidates)))
	count := 0
	for {
		count++
		if count >= len(h.candidates) {
			return "", balancer.ErrorNoAvaliableTarget()
		}
		if hashedIndex >= len(h.candidates) {
			hashedIndex = 0
		}
		candidate := h.candidates[hashedIndex]
		if candidate != "" {
			return candidate, nil
		}
		hashedIndex = hashedIndex + 1
	}
}

후보군에서 후보를 선택하는 방식은 앞서 작성했던 대로 사용자 정보를 해싱하여 인덱스를 구한 후 유효한 후보 정보가 나올 때까지 반복합니다. 만약 너무 오래동안 유효한 후보가 나타나지 않으면 에러를 반환합니다.

func (h *Hashed) Restore(_ string) error {
	return nil
}

Restore 메서드는 해시 로드 밸런서에서는 아무 역할도 하지 않습니다.

차단기

breaker interface

package breaker

type Breaker interface {
	BreakDown(target string) error
	Restore(target string) error
	IsBrokeDown(target string) bool
}

Breaker 인터페이스는 대상이 고장났는지, 고쳐졌는지, 고장난 상태인지를 정할 수 있는 메서드를 제공합니다.

count breaker

package count_breaker

import (
	"math/rand"

	"github.com/diy-cloud/virtual-gate/breaker"
	"github.com/diy-cloud/virtual-gate/lock"
)

type CountBreaker struct {
	cache       map[string]int
	maxCount    int
	minimumRate float64
	l           *lock.Lock
}

func New(maxCount int, minimumRate float64) breaker.Breaker {
	return &CountBreaker{
		cache:       make(map[string]int),
		maxCount:    maxCount,
		minimumRate: minimumRate,
		l:           new(lock.Lock),
	}
}

CountBreaker는 카운트 기반 차단기입니다. 카운트의 최대 한도를 정할 수 있고 최소한의 연결 시도를 보장할 값을 설정할 수 있습니다.

func (c *CountBreaker) BreakDown(target string) error {
	c.l.Lock()
	defer c.l.Unlock()
	if c.cache[target] < c.maxCount {
		c.cache[target] += 1
	}
	return nil
}

func (c *CountBreaker) Restore(target string) error {
	c.l.Lock()
	defer c.l.Unlock()
	if c.cache[target] > 0 {
		c.cache[target] -= 1
	}
	return nil
}

BreakDownRestore 메서드를 통해 카운트를 늘리고 줄일 수 있습니다.

func (c *CountBreaker) IsBrokeDown(target string) bool {
	c.l.Lock()
	defer c.l.Unlock()
	fMaxCount := float64(c.maxCount)
	fTarget := float64(c.cache[target])
	if fTarget >= fMaxCount {
		fTarget = fMaxCount
	}
	maxRate := ((fMaxCount-fTarget)/fMaxCount)*(100-c.minimumRate) + c.minimumRate
	rnd := rand.Float64() * 100
	return rnd >= maxRate
}

카운트가 최대 수를 넘어갈리 없지만 넘어 갔을 때 최대 값으로 한정 짓습니다. maxRate 값을 구하는 식을 통해 카운트에 비례하여 고장 났음을 반환합니다. 최소 연결 시도를 보장하기 위한 보정 값이 추가되어 있어 미리 설정한 최소값은 깔고 계산합니다.

프록시

proxy interface

package proxy

import (
	"github.com/diy-cloud/virtual-gate/balancer"
	"github.com/diy-cloud/virtual-gate/breaker"
	"github.com/diy-cloud/virtual-gate/limiter"
)

type Proxy interface {
	Serve(address string, limiter limiter.Limiter, acl limiter.Limiter, breaker breaker.Breaker, balancer balancer.Balancer) error
}

Proxy 인터페이스는 주소와 레이트 리미터, 또 레이트 리미터, 차단기, 로드 밸런서를 받아서 서버를 실행하는 Serve 메서드가 정의되어 있습니다.

http_proxy

package http_proxy

type HttpProxy struct {
	proxyCache map[string]*httputil.ReverseProxy
	l          *lock.Lock
}

func NewHttp() *HttpProxy {
	return &HttpProxy{
		proxyCache: make(map[string]*httputil.ReverseProxy),
		l:          new(lock.Lock),
	}
}

HttpProxy 객체는 httputil 패키지의 리버스 프록시 객체를 사용합니다. 미리 연결했던 업스트림서버에 대한 리버스 프록시 인스턴스를 저장하고 있다가 재요청이 있을 경우 가져와서 사용합니다.

func (hp *HttpProxy) ServeHTTP(name string, w http.ResponseWriter, r *http.Request) {
	var upstreamServer *httputil.ReverseProxy
	hp.l.Lock()
	if l, ok := hp.proxyCache[name]; ok {
		hp.proxyCache[name] = l
	}
	if upstreamServer == nil {
		url, err := url.Parse(name)
		if err != nil {
			log.Println(err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		}
		upstreamServer = httputil.NewSingleHostReverseProxy(url)
	}
	hp.l.Unlock()

	upstreamServer.ServeHTTP(w, r)

	hp.l.Lock()
	if _, ok := hp.proxyCache[name]; !ok {
		hp.proxyCache[name] = upstreamServer
	}
	hp.l.Unlock()
}

ServeHTTP 메서드는 위에 작성된 것처럼 미리 연결된 리버스 프록시 인스턴스가 있는 지 체크하고 있다면 그걸 사용하고 없다면 새로운 인스턴스는 만들어서 요청하는 동작을 합니다.

func (hp *HttpProxy) Serve(address string, limiter limiter.Limiter, acc limiter.Limiter, breaker breaker.Breaker, balancer balancer.Balancer) error {
	handler := func(w http.ResponseWriter, r *http.Request) {
		remote := []byte(r.RemoteAddr)

		wr := NewResponse()

		if b, code := limiter.TryTake(remote); !b {
			log.Println("HttpProxy.Serve: limiter.TryTake: false from", r.RemoteAddr)
			w.WriteHeader(code)
			return
		}

		if b, code := acc.TryTake(remote); !b {
			log.Println("HttpProxy.Serve: acl.TryTake: false from", r.RemoteAddr)
			w.WriteHeader(code)
			return
		}

		for count := 0; count < 10; count++ {
			upstreamAddress, err := balancer.Get(r.RemoteAddr)
			if err != nil {
				log.Println("HttpProxy.Serve: balancer.Get:", err, "from", r.RemoteAddr)
				w.Write([]byte(err.Error()))
				w.WriteHeader(http.StatusInternalServerError)
				return
			}
			defer balancer.Restore(upstreamAddress)

			if ok := breaker.IsBrokeDown(upstreamAddress); ok {
				log.Println("HttpProxy.Serve: breaker.IsBrokeDown: true from", r.RemoteAddr, "to", upstreamAddress)
				continue
			}

			hp.ServeHTTP(upstreamAddress, wr, r)

			if _, ok := statusCodeSet[wr.StatusCode]; ok {
				log.Println("HttpProxy.Serve: breakDown: true from", r.RemoteAddr, "to", upstreamAddress)
				breaker.BreakDown(upstreamAddress)
				continue
			}

			breaker.Restore(upstreamAddress)

			if _, ok := statusCodeSet[wr.StatusCode]; !ok {
				w.Write(wr.Body)
				w.WriteHeader(wr.StatusCode)
				break
			}
		}
	}
	server := http.Server{
		Addr:    address,
		Handler: http.HandlerFunc(handler),
	}
	return server.ListenAndServe()
}

Serve 메서드는 간단한 핸들러를 기반으로 동작합니다. 해당 핸들러는 레이트 리미터를 통해 요청을 처리할 수 있는 여유가 있는지 확인합니다. acc 객체를 통해 개별 클라이언트 당 요청을 처리할 수 있는 여유가 있는지 확인합니다.

로드 밸런서를 호출하여 적절한 업스트림서버를 받고 브레이커 객체를 통해 해당 업스트림서버가 정상적인지 확인합니다. 만약 아니라면 반복문을 다시 반복하여 밸런서에서 새로운 업스트림서버를 받아옵니다.

성공적으로 동작할 경우 브레이커 객체의 Restore 메서드를 호출하고 요청을 끝냅니다.

main.go

package main

import (
	"time"

	"github.com/diy-cloud/virtual-gate/balancer/least"
	"github.com/diy-cloud/virtual-gate/breaker/count_breaker"
	"github.com/diy-cloud/virtual-gate/limiter/slide_count"
	"github.com/diy-cloud/virtual-gate/limiter/slide_count/acc"
	"github.com/diy-cloud/virtual-gate/proxy/http_proxy"
)

func main() {
	limiter := slide_count.New(30000, time.Microsecond)
	acc := acc.New(60, time.Microsecond)
	breaker := count_breaker.New(200, 10)
	balancer := least.New()
	balancer.Add("http://127.0.0.1:8080")
	balancer.Add("http://127.0.0.1:8081")
	balancer.Add("http://127.0.0.1:8082")
	balancer.Add("http://127.0.0.1:8083")
	balancer.Add("http://127.0.0.1:8084")
	balancer.Add("http://127.0.0.1:8085")
	balancer.Add("http://127.0.0.1:8086")
	balancer.Add("http://127.0.0.1:8087")
	balancer.Add("http://127.0.0.1:8088")
	balancer.Add("http://127.0.0.1:8089")
	proxy := http_proxy.NewHttp()

	if err := proxy.Serve("0.0.0.0:9999", limiter, acc, breaker, balancer); err != nil {
		panic(err)
	}
}

마이크로세컨드 당 30000회의 요청을 수락하고 사용자 별로 마이크로세컨드 당 60회의 요청을 수락하는 레이트 리미터를 생성하였습니다. 브레이커는 최대 200회의 누적 실패 수를 기록하고 10%의 최소 연결 시도를 보장하는 인스턴스로 생성합니다. 밸런서는 최소 연결 방식이며 로컬 호스트 8080~8089까지 연결했습니다.

프록시는 HTTP 리버스 프록시로 로컬 호스트의 9999 포트에 연결하였습니다. 정상적으로 8080~8089 포트에 http 서버를 띄워놓았다면 9999 포트로 접속했을 때 돌아가며 요청을 처리하는 걸 확인할 수 있습니다.