pull/25/head
sunface 7 years ago
parent aca6193dec
commit 1e53668872

BIN
.DS_Store vendored

Binary file not shown.

@ -0,0 +1,125 @@
package api
import (
"net/http"
"net/rpc"
"tfgo/tfe/api/async"
"time"
"github.com/mafanr/juz/api/filter"
"github.com/mafanr/juz/api/manage"
"github.com/mafanr/juz/api/stats"
"github.com/mafanr/juz/misc"
"github.com/mafanr/g"
_ "github.com/go-sql-driver/mysql"
"github.com/labstack/echo"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)
type ApiServer struct {
manage *manage.Manage
router *router
}
func (p *ApiServer) Start() {
g.L.Info("start tfe..")
// 初始化mysql连接
misc.InitMysql()
// 从mysql中加载所有的api信息到内存中
p.loadData()
p.manage = &manage.Manage{}
go p.manage.Start()
p.router = &router{p, &filter.Filter{}}
// 连接到traffic rpc服务
p.initTraffic()
// 启动proxy http服务
go p.listen()
// 启动metrics收集
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(":6062", nil)
}
func (o *ApiServer) Shutdown() {
g.L.Info("shutdown tfe..")
}
func (p *ApiServer) listen() {
e := echo.New()
// 回调相关
//同步回调接口
e.Any("/*", p.router.route, timing)
//异步回调接口
e.POST("/notify", async.Notify)
e.Logger.Fatal(e.Start(":" + misc.Conf.Api.Port))
}
func timing(f echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
ts := time.Now()
rid := (ts.UnixNano()/10)*10 + misc.Conf.Api.ServerID
g.L.Info("New request accepted", zap.Int64("rid", rid), zap.String("ip", c.RealIP()))
c.Set("rid", rid)
defer func() {
// 统计请求指标
apiID := c.Get("api_id").(string)
service := c.Get("service").(string)
label := c.Get("label").(string)
stats.Req.With(prometheus.Labels{
"api_id": apiID,
"service": service,
"label": label,
}).Observe(float64(time.Now().Sub(ts).Nanoseconds() / 1e6))
err := c.Get("error_msg")
if err == nil {
g.L.Info("Request success", zap.Int64("rid", rid))
} else {
g.L.Info("Request failed", zap.Int64("rid", rid), zap.Error(err.(error)))
}
}()
return f(c)
}
}
func (as *ApiServer) initTraffic() {
r, err := rpc.Dial("tcp", misc.Conf.Traffic.Host+":"+misc.Conf.Traffic.Port)
if err != nil {
g.L.Fatal("connect to raffic error", zap.Error(err))
}
as.router.Filter.Rpc = r
// 定时检测rpc连接的存活性
go func() {
for {
var res int
err := as.router.Filter.Rpc.Call("RateLimiter.Ping", 1, &res)
if err != nil || res != 1 {
g.L.Warn("rpc ping failed", zap.Error(err))
r, err := rpc.Dial("tcp", misc.Conf.Traffic.Host+":"+misc.Conf.Traffic.Port)
if err != nil {
g.L.Warn("re-connect to traffic error", zap.Error(err))
time.Sleep(2 * time.Second)
continue
}
as.router.Filter.Rpc = r
g.L.Info("re-connect to traffic ok")
}
time.Sleep(3 * time.Second)
}
}()
}

@ -0,0 +1,29 @@
package api
import (
"net/http"
"testing"
"time"
"github.com/mafanr/g"
"github.com/stretchr/testify/assert"
)
func TestTfeStartStop(t *testing.T) {
p := &Proxy{}
g.InitConfig("../tfe.conf")
g.InitLogger()
go func() {
p.Start()
}()
time.Sleep(2 * time.Second)
resp, err := http.Get("http://localhost:" + g.Conf.Common.Port + "/service/api?service_id=" + g.TEST_API_NAME)
assert.NoError(t, err)
assert.Equal(t, 200, resp.StatusCode)
p.Shutdown()
}

@ -0,0 +1,7 @@
package filter
/*---------------访问授权和验签-----------------*/
// AK/SK的方式达成以下目标
//1. 鉴别用户是否有访问应用的权限: ak能否对上
//2. 鉴别访问数据是否完整 md5
//3. 验证用户签名: RSA, 用sk

@ -0,0 +1,53 @@
package filter
import (
"fmt"
"github.com/mafanr/juz/api/req"
"github.com/mafanr/juz/misc"
)
/*
*/
func (f *Filter) checkBW(r *req.Request) error {
if r.BwStrategy == nil || r.BwStrategy.Type == 0 {
return nil
}
inList := false
var k, v string
for _, bw := range r.BwStrategy.BwList {
if bw.Type == misc.IP_TYPE {
// IP类型
if bw.Val == r.ClientIP {
inList = true
k = bw.Key
v = bw.Val
break
}
} else {
// 参数类型
v, ok := r.Params[bw.Key]
if ok && v == bw.Val {
inList = true
k = bw.Key
v = bw.Val
break
}
}
}
if r.BwStrategy.Type == misc.BLACK_LIST {
if inList { // 在黑名单上
return fmt.Errorf("Black list%s, user %s is blocked", k, v)
}
return nil
}
if inList { // 在白名单上
return nil
}
return fmt.Errorf("White list: %s, user %s is blocked", k, v)
}

@ -0,0 +1,96 @@
package filter
import (
"net/http"
"net/rpc"
"github.com/mafanr/juz/api/req"
"github.com/mafanr/juz/api/stats"
"github.com/mafanr/g"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
/*
.
.
fileter.
*/
// 可以在以下四个地方添加拦截插件
//1.请求开始前
//2.接受到服务的返回数据,返回给客户端之前
//3.请求失败后
// 拦截器顺序
//1.拦截器分为网关自带和用户定义的
//2.每个拦截器应该具有优先级编号
//3.按照优先级来调用拦截器
type Filter struct {
Rpc *rpc.Client
}
type Result struct {
Status int
Ecode int
Emsg string
}
// 收到请求路由前的hook
func (f *Filter) BeforeRoute(r *req.Request) Result {
// 黑白名单
err := f.checkBW(r)
if err != nil {
g.L.Info("BeforeRoute failed", zap.Error(err))
// 统计被阻挡数
stats.Limits.With(prometheus.Labels{
"api_id": r.Api.APIID,
"service": r.Api.Service,
"label": r.Api.Label,
}).Inc()
return Result{http.StatusForbidden, g.ForbiddenC, g.ForbiddenE}
}
// 参数校验
err = f.verifyParam(r)
if err != nil {
g.L.Info("BeforeRoute failed", zap.Error(err))
return Result{http.StatusBadRequest, g.ParamInvalidC, g.ParamInvalidE}
}
// 流量路由
f.trafficRoute(r)
// 检查当
return Result{}
}
// 调用目标服务前的hook
func (f *Filter) BeforeCall(r *req.Request) Result {
code, err := f.IncApiRate(r)
if err != nil {
// 统计被阻挡数
stats.Limits.With(prometheus.Labels{
"api_id": r.Api.APIID,
"service": r.Api.Service,
"label": r.Api.Label,
}).Inc()
return Result{code, g.AccessLimitedC, err.Error()}
}
return Result{}
}
// 调用目标服务完成后的hook
func (f *Filter) AfterCall(r *req.Request) Result {
// 并发数减1
f.DecApiRate(r)
return Result{}
}
func (f *Filter) RouteFailed(r *req.Request) Result {
return Result{}
}

@ -0,0 +1,32 @@
package filter
import (
"fmt"
"regexp"
"github.com/mafanr/juz/api/req"
"github.com/mafanr/juz/misc"
)
/*
*/
func (f *Filter) verifyParam(r *req.Request) error {
if r.Api.VerifyOn == misc.PARAM_VERIFY_OFF {
return nil
}
for k, v := range r.Params {
regI, ok := r.Api.ParamRules.Load(k)
if !ok {
continue
}
reg := regI.(*regexp.Regexp)
if !reg.MatchString(v) { // 参数不合法
return fmt.Errorf("param %s,value %s, verify failed", k, v)
}
}
return nil
}

@ -0,0 +1,63 @@
package filter
import (
"errors"
"net/http"
"strings"
"github.com/mafanr/juz/api/req"
"github.com/mafanr/juz/misc"
"github.com/mafanr/g"
"go.uber.org/zap"
)
func (f *Filter) DecApiRate(r *req.Request) {
// 并发控制减1
if r.Api.TrafficStrategy != 0 {
if r.TrafficStrategy.Concurrent != misc.STRATEGY_NO_LIMIT {
res := &misc.TrafficConRes{}
err := f.Rpc.Call("RateLimiter.DecApiRate", &misc.TrafficConReq{r.Api.APIID, r.Api.TrafficStrategy, ""}, &res)
if err != nil {
if !strings.Contains(err.Error(), "shut down") {
g.L.Warn("rpc出错了", zap.Error(err))
}
}
}
}
}
func (f *Filter) IncApiRate(r *req.Request) (int, error) {
if r.Api.TrafficStrategy != 0 {
// 获取用户流量控制的参数和值
var val string
param := r.TrafficStrategy.Param
if param != "" {
// 找到该参数对应的值传给traffic服务进行限制
for k, v := range r.Params {
if k == param {
val = v
}
}
}
if r.TrafficStrategy.QPS != misc.STRATEGY_NO_LIMIT || r.TrafficStrategy.Concurrent != misc.STRATEGY_NO_LIMIT || val != "" { // 至少有一项受到限制
res := &misc.TrafficConRes{}
err := f.Rpc.Call("RateLimiter.IncApiRate", &misc.TrafficConReq{r.Api.APIID, r.Api.TrafficStrategy, val}, &res)
if err != nil {
if !strings.Contains(err.Error(), "shut down") {
g.L.Warn("rpc出错了", zap.Error(err))
}
return 0, nil
}
if !res.Suc {
return http.StatusTooManyRequests, errors.New("当前访问数过多,超过了预设的阀值")
}
}
}
return 0, nil
}

@ -0,0 +1,48 @@
package filter
import (
"math/rand"
"strings"
"github.com/mafanr/juz/api/req"
"github.com/mafanr/juz/misc"
"github.com/mafanr/g"
"go.uber.org/zap"
)
// 检查是否触发traffic route
// 一旦触发把本次请求的api name换成触发后的结果
func (f *Filter) trafficRoute(r *req.Request) {
if r.Api.TrafficOn == misc.TRAFFIC_OFF {
// 流量路由未开启,返回
return
}
if r.Api.TrafficAPI == "" {
// 路由到的api name为空返回
return
}
apiI, ok := misc.Apis.Load(r.Api.TrafficAPI)
if !ok {
// api不存在返回
return
}
api := apiI.(*misc.API)
// 是否在路由ip列表中如果在直接路由
if strings.Contains(r.Api.TrafficIPs, r.ClientIP) {
g.Debug(r.DebugOn, misc.Conf.Common.LogLevel, "Canary by ip", zap.String("old_api", r.Api.APIID), zap.String("new_api", api.APIID), zap.String("client_ip", r.ClientIP))
r.Api = api
return
}
// 是否进行流量路由,落到了[0,trafficRatio]的区间,才进行路由
n := rand.Intn(101)
if n > r.Api.TrafficRatio {
return
}
g.Debug(r.DebugOn, misc.Conf.Common.LogLevel, "Canary by random", zap.String("old_api", r.Api.APIID), zap.String("new_api", api.APIID))
r.Api = api
}

@ -0,0 +1,182 @@
package api
import (
"encoding/json"
"fmt"
"regexp"
"sync"
"time"
"github.com/mafanr/juz/misc"
"github.com/mafanr/g"
"github.com/sunface/talent"
"go.uber.org/zap"
)
func (p *ApiServer) loadData() {
// 插入一条Test数据
now := time.Now()
date := talent.Time2StringSecond(time.Now())
version := talent.Time2Version(now)
g.DB.Exec(fmt.Sprintf("insert into api_release (service,api_id,description,mock_data,route_type,route_addr,create_date) values('admin','admin.test.get.v1','','',1,'http://httpbin.org/get','%s')", date))
g.DB.Exec(fmt.Sprintf("insert into api_define (service,api_id,description,mock_data,route_type,route_addr,revise_version,release_version,create_date) values('admin','admin.test.get.v1','','',1,'http://httpbin.org/get','%s','%s','%s')", version, version, date))
lastLoadTime = time.Now()
// 加载所有数据
p.loadAll()
// 定时加载最新的信息
go p.loadUpdated()
}
var lastLoadTime time.Time
func (p *ApiServer) loadAll() {
// 加载所有apis
apisS := make([]*misc.API, 0)
err := g.DB.Select(&apisS, "select * from api_release")
if err != nil {
g.L.Fatal("load apis error!", zap.Error(err))
}
an := make([]string, 0, len(apisS))
for _, api := range apisS {
api.ParamRules = &sync.Map{}
an = append(an, api.APIID)
if api.ParamTable != nil {
d, _ := g.B64.DecodeString(*api.ParamTable)
// 解析param rules
var prs []*misc.ParamRule
json.Unmarshal(d, &prs)
for _, pr := range prs {
reg, _ := regexp.Compile(pr.ParamRule)
api.ParamRules.Store(pr.Param, reg)
}
}
}
for _, api := range apisS {
misc.Apis.Store(api.APIID, api)
}
// 加载所有strategy
strategies := make([]*misc.Strategy, 0)
err = g.DB.Select(&strategies, "select * from strategy")
if err != nil {
g.L.Fatal("load strategies error!", zap.Error(err))
}
for _, s := range strategies {
// 生成具体的策略内容
switch s.Type {
case misc.STRATEGY_BWLIST:
t := &misc.BwStrategy{
Type: s.Type,
BwList: make([]*misc.BW, 0),
}
json.Unmarshal([]byte(s.Content), &t.BwList)
s.DetailContent = t
case misc.STRATEGY_RETRY:
t := &misc.RetryStrategy{}
json.Unmarshal([]byte(s.Content), &t)
s.DetailContent = t
case misc.STRATEGY_TRAFFIC:
t := &misc.TrafficStrategy{}
json.Unmarshal([]byte(s.Content), &t)
s.DetailContent = t
}
misc.Strategies.Store(s.ID, s)
}
}
func (p *ApiServer) loadUpdated() {
wg := &sync.WaitGroup{}
for {
wg.Add(2)
// 为了防止访问时恰好在更新数据这里给出2秒的容忍值
lastT := talent.Time2String(lastLoadTime.Add(-2 * time.Second))
lastLoadTime = time.Now()
// 加载apis
go func() {
defer wg.Done()
apisS := make([]*misc.API, 0)
err := g.DB.Select(&apisS, fmt.Sprintf("select * from api_release where modify_date >= '%s'", lastT))
if err != nil {
g.L.Error("load apis error!", zap.Error(err))
return
}
if len(apisS) == 0 {
return
}
// 加载参数规则
an := make([]string, 0, len(apisS))
for _, api := range apisS {
api.ParamRules = &sync.Map{}
an = append(an, api.APIID)
if api.ParamTable != nil {
d, _ := g.B64.DecodeString(*api.ParamTable)
// 解析param rules
var prs []*misc.ParamRule
json.Unmarshal(d, &prs)
for _, pr := range prs {
reg, _ := regexp.Compile(pr.ParamRule)
api.ParamRules.Store(pr.Param, reg)
}
}
}
for _, api := range apisS {
misc.Apis.Store(api.APIID, api)
}
}()
// 加载策略
go func() {
defer wg.Done()
strategies := make([]*misc.Strategy, 0)
query := fmt.Sprintf("select * from strategy where modify_date >= '%s'", lastT)
err := g.DB.Select(&strategies, query)
if err != nil {
g.L.Error("load strategies error!", zap.Error(err), zap.String("query", query))
return
}
for _, s := range strategies {
// 生成具体的策略内容
switch s.Type {
case misc.STRATEGY_BWLIST:
t := &misc.BwStrategy{
Type: s.Type,
BwList: make([]*misc.BW, 0),
}
json.Unmarshal([]byte(s.Content), &t.BwList)
s.DetailContent = t
case misc.STRATEGY_RETRY:
t := &misc.RetryStrategy{}
json.Unmarshal([]byte(s.Content), &t)
s.DetailContent = t
case misc.STRATEGY_TRAFFIC:
t := &misc.TrafficStrategy{}
json.Unmarshal([]byte(s.Content), &t)
s.DetailContent = t
}
misc.Strategies.Store(s.ID, s)
}
}()
wg.Wait()
time.Sleep(10 * time.Second)
}
}

@ -0,0 +1,37 @@
package api
import (
"sync"
"testing"
"github.com/mafanr/juz/g"
"github.com/stretchr/testify/assert"
)
func TestLoadRow(t *testing.T) {
g.InitConfig("../tfe.conf")
g.InitLogger()
g.InitMysql()
p := &Proxy{}
p.apis = &sync.Map{}
p.loadAPIRow(g.TEST_API_NAME)
_, ok := p.apis.Load(g.TEST_API_NAME)
assert.True(t, ok)
}
func TestLoadAll(t *testing.T) {
g.InitConfig("../tfe.conf")
g.InitLogger()
g.InitMysql()
p := &Proxy{}
p.apis = &sync.Map{}
p.loadAllAPIs()
_, ok := p.apis.Load(g.TEST_API_NAME)
assert.True(t, ok)
}

@ -0,0 +1,834 @@
package manage
import (
"encoding/json"
"fmt"
"net/http"
"regexp"
"strconv"
"strings"
"time"
"github.com/mafanr/juz/api/manage/audit"
"github.com/mafanr/juz/misc"
"github.com/mafanr/g"
"github.com/labstack/echo"
"github.com/sunface/talent"
"go.uber.org/zap"
)
/*---------------------API相关-----------------------------*/
func (m *Manage) QueryAPI(c echo.Context) error {
service := talent.FormValue(c, "service")
q := talent.FormValue(c, "q")
pageS := talent.FormValue(c, "page")
if service == "" || pageS == "" {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamEmptyC,
Message: g.ParamEmptyE,
})
}
page, _ := strconv.Atoi(pageS)
if page <= 0 {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamInvalidC,
Message: g.ParamInvalidE,
})
}
role := talent.FormValue(c, "app_priv")
if !m.canView(role) {
return c.JSON(http.StatusForbidden, g.Result{
Status: http.StatusForbidden,
ErrCode: g.ForbiddenC,
Message: g.ForbiddenE,
})
}
apis := make([]*misc.API, 0)
var query string
if q == "" {
query = fmt.Sprintf("select * from api_define where service='%s' order by modify_date desc limit %d offset %d", service, g.PER_PAGE, g.PER_PAGE*(page-1))
} else {
query = fmt.Sprintf("select * from api_define where service='%s' and ", service) + "api_id like '%" + q + "%' " + fmt.Sprintf(" order by modify_date desc limit %d offset %d", g.PER_PAGE, g.PER_PAGE*(page-1))
}
err := g.DB.Select(&apis, query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
for _, api := range apis {
if api.ParamTable != nil {
d, _ := g.B64.DecodeString(*api.ParamTable)
d1 := talent.Bytes2String(d)
api.ParamTable = &d1
}
}
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
Data: apis,
})
}
func (m *Manage) CountAPI(c echo.Context) error {
service := talent.FormValue(c, "service")
q := talent.FormValue(c, "q")
if service == "" {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamEmptyC,
Message: g.ParamEmptyE,
})
}
var query string
if q == "" {
query = fmt.Sprintf("select count(1) from api_release where service='%s'", service)
} else {
query = fmt.Sprintf("select count(1) from api_release where service='%s' and api_id like '%%", service) + q + "%'"
}
rows, err := g.DB.Query(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
var total int
rows.Next()
rows.Scan(&total)
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
Data: total,
})
}
func (m *Manage) DefineAPI(c echo.Context) error {
api, ecode, emsg := m.parseAPI(c)
if ecode != 0 {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: ecode,
Message: emsg,
})
}
//必须是该service的管理员或者创建者
if !m.canOperate(talent.FormValue(c, "app_priv")) {
return c.JSON(http.StatusForbidden, g.Result{
Status: http.StatusForbidden,
ErrCode: g.ForbiddenC,
Message: g.ForbiddenE,
})
}
action := talent.FormValue(c, "action")
now := time.Now()
date := talent.Time2StringSecond(now)
pr := g.B64.EncodeToString(talent.String2Bytes(*api.ParamTable))
if action == "create" {
query := fmt.Sprintf(`insert into api_define (api_id,path_type,service,description,route_type,route_addr,route_proto,bw_strategy,retry_strategy,traffic_strategy,mock_data,traffic_on,traffic_api,traffic_ratio,traffic_ips,verify_on,param_rules,cached_time,revise_version,create_date,label)
values ('%s','%d','%s','%s','%d','%s','%d','%d','%d','%d','%s','%d','%s','%d','%s','%d','%s','%d','%s', '%s','%s')`,
api.APIID, api.PathType, api.Service, *api.Desc, api.RouteType, api.RouteAddr, api.RouteProto, api.BwStrategy, api.RetryStrategy, api.TrafficStrategy, *api.MockData, api.TrafficOn, api.TrafficAPI, api.TrafficRatio, api.TrafficIPs, api.VerifyOn, pr, api.CachedTime, talent.Time2Version(now), date, api.Label)
_, err := g.DB.Exec(query)
if err != nil {
if strings.Contains(err.Error(), g.DUP_KEY_ERR) {
return c.JSON(http.StatusConflict, g.Result{
Status: http.StatusConflict,
ErrCode: g.AlreadyExistC,
Message: g.AlreadyExistE,
})
}
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
// 初始化api发布表并设置为未发布状态
// 这里把不能再修改的值进行初始化
query = fmt.Sprintf(`insert into api_release (api_id,path_type,service,route_addr,status,create_date) values ('%s','%d','%s','%s','%d','%s')`,
api.APIID, api.PathType, api.Service, api.RouteAddr, misc.API_OFFLINE, date)
_, err = g.DB.Exec(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
audit.Log(c.FormValue("username"), api.Service, audit.TypeApi, api.APIID, audit.OpCreate, c.FormValue("api"), "")
} else {
query := fmt.Sprintf("update api_define set description='%s',route_type='%d',route_addr='%s',route_proto='%d',bw_strategy='%d',retry_strategy='%d',traffic_strategy='%d',mock_data='%s',traffic_on='%d',traffic_api='%s',traffic_ratio='%d',traffic_ips='%s',verify_on='%d',param_rules='%s',cached_time='%d',label='%s' where api_id='%s'",
*api.Desc, api.RouteType, api.RouteAddr, api.RouteProto, api.BwStrategy, api.RetryStrategy, api.TrafficStrategy, *api.MockData, api.TrafficOn, api.TrafficAPI, api.TrafficRatio, api.TrafficIPs, api.VerifyOn, pr, api.CachedTime, api.Label, api.APIID)
res, err := g.DB.Exec(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
n, _ := res.RowsAffected()
if n == 0 {
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
ErrCode: g.NoUpdateHappenedC,
Message: g.NoUpdateHappenedE,
})
}
query = fmt.Sprintf("update api_define set revise_version='%s' where api_id='%s'", talent.Time2Version(now), api.APIID)
g.DB.Exec(query)
audit.Log(c.FormValue("username"), api.Service, audit.TypeApi, api.APIID, audit.OpEdit, c.FormValue("api"), "")
}
// 查询并返回最新的api
api1 := misc.API{}
g.DB.Get(&api1, fmt.Sprintf("select * from api_define where api_id='%s'", api.APIID))
if api1.ParamTable != nil {
d, _ := g.B64.DecodeString(*api1.ParamTable)
d1 := talent.Bytes2String(d)
api1.ParamTable = &d1
}
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
Data: api1,
})
}
// API下线10分钟后方可删除
func (m *Manage) DeleteAPI(c echo.Context) error {
service := talent.FormValue(c, "service")
apiID := talent.FormValue(c, "api_id")
userID := talent.FormValue(c, "username")
if apiID == "" || userID == "" || service == "" {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamEmptyC,
Message: g.ParamEmptyE,
})
}
//必须是该service的管理员或者创建者
if !m.canOperate(talent.FormValue(c, "app_priv")) {
return c.JSON(http.StatusForbidden, g.Result{
Status: http.StatusForbidden,
ErrCode: g.ForbiddenC,
Message: g.ForbiddenE,
})
}
// 查询API的发布状态只有下线10分钟后才能删除
query := fmt.Sprintf("select status,modify_date from api_release where api_id='%s'", apiID)
rows, err := g.DB.Query(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
if !rows.Next() {
return c.JSON(http.StatusNotFound, g.Result{
Status: http.StatusNotFound,
ErrCode: APIOfflineC,
Message: APIOfflineE,
})
}
var status int
var ud string
rows.Scan(&status, &ud)
// 已经发布的不能删除
if status == misc.API_RELEASED {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: ApiStillReleasedC,
Message: ApiStillReleasedE,
})
}
// 下线不到30秒不能删除
t, _ := talent.StringToTime(ud)
if time.Now().Sub(t) < 30*time.Second {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: ApiInactiveNotLongEnoughC,
Message: ApiInactiveNotLongEnoughE,
})
}
// 将api发布表的状态设置为已删除
query = fmt.Sprintf("delete from api_release where api_id='%s'", apiID)
_, err = g.DB.Exec(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
// 查询api定义,为了记录审计日志
api := misc.API{}
query = fmt.Sprintf("select * from api_define where api_id='%s'", apiID)
err = g.DB.Get(&api, query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
}
if api.ParamTable != nil {
d, _ := g.B64.DecodeString(*api.ParamTable)
d1 := talent.Bytes2String(d)
api.ParamTable = &d1
}
b, _ := json.Marshal(api)
// 从api定义表删除
query = fmt.Sprintf("delete from api_define where api_id='%s'", apiID)
_, err = g.DB.Exec(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
audit.Log(userID, service, audit.TypeApi, apiID, audit.OpDelete, talent.Bytes2String(b), "")
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
})
}
func (m *Manage) parseAPI(c echo.Context) (*misc.API, int, string) {
apiR := c.FormValue("api")
api := &misc.API{}
err := json.Unmarshal([]byte(apiR), &api)
if err != nil {
g.L.Info("parse api", zap.Error(err), zap.String("api", string(apiR)))
return nil, g.ParamInvalidC, g.ParamInvalidE
}
if api.Service == "" {
return nil, ServiceEmptyC, ServiceEmptyE
}
if api.PathType != misc.OFF && api.PathType != misc.ON {
return nil, ApiPathTypeInvalidC, ApiPathTypeInvalidE
}
if api.PathType == misc.OFF {
// 非路径格式api名组成形式
if !talent.OnlyAlphaNumAndDot(api.APIID) {
return nil, ApiOnlyAlphaNumAndDotC, ApiOnlyAlphaNumAndDotE
}
// 检查api id的前缀
if !strings.HasPrefix(api.APIID, api.Service+".") {
return nil, ApiWithServicePrefixC, ApiWithServicePrefixE
}
} else {
// @todo,检查路径形式的参数是否合法,/a/b/c
//路径不能为保留的
if !talent.OnlyAlphaNumAndUri(api.APIID) {
return nil, ApiOnlyAlphaNumAndUriC, ApiOnlyAlphaNumAndUriE
}
name := api.APIID[:len(api.APIID)-3]
if name == "/service/api" || name == "/notify" {
return nil, ApiReservePathC, ApiReservePathE
}
}
// 检查api id的后缀
if api.APIID[len(api.APIID)-2] != 'v' || api.APIID[len(api.APIID)-3] != '.' {
return nil, ApiWithServiceSuffixC, ApiWithServiceSuffixE
}
if api.RouteAddr == "" || strings.TrimSpace(api.RouteAddr) == "http://" || strings.TrimSpace(api.RouteAddr) == "https://" {
return nil, RouteAddrEmptyC, RouteAddrEmptyE
}
if api.RouteAddr != "" {
if !strings.HasPrefix(api.RouteAddr, "http") {
return nil, RouteAddrWithHTTPPrefixC, RouteAddrWithHTTPPrefixE
}
}
if (api.RouteProto != 1) && (api.RouteProto != 2) {
return nil, RouteProtoInvalidC, RouteProtoInvalidE
}
if api.TrafficAPI != "" {
_, ok := misc.Apis.Load(api.TrafficAPI)
if !ok {
return nil, ApiNotExistC, ApiNotExistE
}
}
if api.TrafficRatio < 0 || api.TrafficRatio > 100 {
return nil, TrafficRatioInvalidC, TrafficRatioInvalidE
}
if (api.TrafficOn != 0) && (api.TrafficOn != 1) {
api.TrafficOn = 0
}
if (api.VerifyOn != 0) && (api.VerifyOn != 1) {
api.VerifyOn = 0
}
if (api.CachedTime < 0) || (api.CachedTime > 30) {
api.CachedTime = 0
}
return api, 0, ""
}
func (m *Manage) APIRelease(c echo.Context) error {
apiID := c.FormValue("api_id")
userID := c.FormValue("username")
if apiID == "" {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamEmptyC,
Message: g.ParamEmptyE,
})
}
//必须是该service的管理员或者创建者
if !m.canOperate(talent.FormValue(c, "app_priv")) {
return c.JSON(http.StatusForbidden, g.Result{
Status: http.StatusForbidden,
ErrCode: g.ForbiddenC,
Message: g.ForbiddenE,
})
}
// 查询最新的api定义
api := misc.API{}
query := fmt.Sprintf("select * from api_define where api_id='%s'", apiID)
err := g.DB.Get(&api, query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
// 更新release
query = fmt.Sprintf("update api_release set description='%s',route_type='%d',route_addr='%s',route_proto='%d',mock_data='%s',retry_strategy='%d',bw_strategy='%d',traffic_strategy='%d',traffic_on='%d',traffic_api='%s',traffic_ratio='%d',traffic_ips='%s',verify_on='%d',param_rules='%s', cached_time='%d',status='%d',label='%s' where api_id='%s'",
*api.Desc, api.RouteType, api.RouteAddr, api.RouteProto, *api.MockData, api.RetryStrategy, api.BwStrategy, api.TrafficStrategy, api.TrafficOn, api.TrafficAPI, api.TrafficRatio, api.TrafficIPs, api.VerifyOn, *api.ParamTable, api.CachedTime, misc.API_RELEASED, api.Label, api.APIID)
_, err = g.DB.Exec(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
query = fmt.Sprintf("update api_define set release_version='%s' where api_id='%s'", api.ReviseVersion, api.APIID)
_, err = g.DB.Exec(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
if api.ParamTable != nil {
d, _ := g.B64.DecodeString(*api.ParamTable)
d1 := talent.Bytes2String(d)
api.ParamTable = &d1
}
b, _ := json.Marshal(api)
audit.Log(userID, api.Service, audit.TypeApi, api.APIID, audit.OpRelease, talent.Bytes2String(b), "")
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
})
}
func (m *Manage) APIOffline(c echo.Context) error {
apiID := c.FormValue("api_id")
userID := c.FormValue("username")
if apiID == "" {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamEmptyC,
Message: g.ParamEmptyE,
})
}
//必须是该service的管理员或者创建者
if !m.canOperate(talent.FormValue(c, "app_priv")) {
return c.JSON(http.StatusForbidden, g.Result{
Status: http.StatusForbidden,
ErrCode: g.ForbiddenC,
Message: g.ForbiddenE,
})
}
query := fmt.Sprintf("update api_release set status='%d' where api_id='%s'",
misc.API_OFFLINE, apiID)
_, err := g.DB.Exec(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
// 更新api_define表的发布时间
query = fmt.Sprintf("update api_define set release_version='%s' where api_id='%s'", "", apiID)
_, err = g.DB.Exec(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
// 查询已发布的api定义
api := misc.API{}
query = fmt.Sprintf("select * from api_release where api_id='%s'", apiID)
err = g.DB.Get(&api, query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
}
if api.ParamTable != nil {
d, _ := g.B64.DecodeString(*api.ParamTable)
d1 := talent.Bytes2String(d)
api.ParamTable = &d1
}
b, _ := json.Marshal(api)
audit.Log(userID, api.Service, audit.TypeApi, api.APIID, audit.OpOffline, talent.Bytes2String(b), "")
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
})
}
// 请求参数验证
func (m *Manage) VerifyParamRule(c echo.Context) error {
param := strings.TrimSpace(c.FormValue("param"))
rule := strings.TrimSpace(c.FormValue("rule"))
testData := strings.TrimSpace(c.FormValue("test_data"))
if param == "" || rule == "" || testData == "" {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamEmptyC,
Message: g.ParamEmptyE,
})
}
// 验证正则是否合法
r, err := regexp.Compile(rule)
if err != nil {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: InvalidVerifyTableC,
Message: InvalidVerifyTableE,
})
}
// 验证测试数据跟正则是否匹配
if !r.Match([]byte(testData)) {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: InvalidVerifyRuleC,
Message: InvalidVerifyRuleE,
})
}
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
})
}
//批量设置策略
func (m *Manage) APIBatchStrategy(c echo.Context) error {
apiIDS := strings.TrimSpace(c.FormValue("api_ids"))
bw := strings.TrimSpace(c.FormValue("batch_bw"))
retry := strings.TrimSpace(c.FormValue("batch_retry"))
traffic := strings.TrimSpace(c.FormValue("batch_traffic"))
service := c.FormValue("service")
if apiIDS == "" || (bw == "" && retry == "" && traffic == "") || service == "" {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamEmptyC,
Message: g.ParamEmptyE,
})
}
var apiIds []string
err := json.Unmarshal([]byte(apiIDS), &apiIds)
if err != nil {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamInvalidC,
Message: g.ParamInvalidE,
})
}
//必须是该service的管理员或者创建者
if !m.canOperate(talent.FormValue(c, "app_priv")) {
return c.JSON(http.StatusForbidden, g.Result{
Status: http.StatusForbidden,
ErrCode: g.ForbiddenC,
Message: g.ForbiddenE,
})
}
now := time.Now()
for _, apiID := range apiIds {
var query = "update api_define set"
if bw != "" {
query = fmt.Sprintf("%s bw_strategy='%s', ", query, bw)
}
if retry != "" {
query = fmt.Sprintf("%s retry_strategy='%s', ", query, retry)
}
if traffic != "" {
query = fmt.Sprintf("%s traffic_strategy='%s'", query, traffic)
}
query = fmt.Sprintf("%s where api_id = '%s'", query, apiID)
res, err := g.DB.Exec(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
n, _ := res.RowsAffected()
if n == 0 {
continue
}
// 更新版本号
query = fmt.Sprintf("update api_define set revise_version='%s' where api_id='%s'", talent.Time2Version(now), apiID)
g.DB.Exec(query)
}
userID := c.FormValue("username")
audit.Log(userID, service, audit.TypeBatch, fmt.Sprintf("bw: %s,retry: %s", bw, retry), audit.OpCreate, apiIDS, "批量添加策略")
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
})
}
// 批量删除策略
func (m *Manage) APIBatchDelStrategy(c echo.Context) error {
apiIDS := strings.TrimSpace(c.FormValue("api_ids"))
service := c.FormValue("service")
if apiIDS == "" {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamEmptyC,
Message: g.ParamEmptyE,
})
}
var apiIds []string
err := json.Unmarshal([]byte(apiIDS), &apiIds)
if err != nil {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamInvalidC,
Message: g.ParamInvalidE,
})
}
//必须是该service的管理员或者创建者
if !m.canOperate(talent.FormValue(c, "app_priv")) {
return c.JSON(http.StatusForbidden, g.Result{
Status: http.StatusForbidden,
ErrCode: g.ForbiddenC,
Message: g.ForbiddenE,
})
}
tp, _ := strconv.Atoi(c.FormValue("type"))
now := time.Now()
for _, apiID := range apiIds {
var query string
switch tp {
case misc.STRATEGY_BWLIST:
query = fmt.Sprintf("update api_define set bw_strategy='%d' where api_id = '%s'", misc.STRATEGY_EMPTY, apiID)
case misc.STRATEGY_RETRY:
query = fmt.Sprintf("update api_define set retry_strategy='%d' where api_id = '%s'", misc.STRATEGY_EMPTY, apiID)
case misc.STRATEGY_TRAFFIC:
query = fmt.Sprintf("update api_define set traffic_strategy='%d' where api_id = '%s'", misc.STRATEGY_EMPTY, apiID)
default:
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamInvalidC,
Message: g.ParamInvalidE,
})
}
res, err := g.DB.Exec(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
n, _ := res.RowsAffected()
if n == 0 {
continue
}
// 更新版本号
query = fmt.Sprintf("update api_define set revise_version='%s' where api_id='%s'", talent.Time2Version(now), apiID)
g.DB.Exec(query)
}
userID := c.FormValue("username")
var msg string
switch tp {
case misc.STRATEGY_BWLIST:
msg = "White/Black List"
case misc.STRATEGY_RETRY:
msg = "Timeout/Retry"
case misc.STRATEGY_TRAFFIC:
msg = "Traffic Control"
}
audit.Log(userID, service, audit.TypeBatch, msg, audit.OpDelete, apiIDS, "Batch delete strategy")
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
})
}
func (m *Manage) APIBatchRelease(c echo.Context) error {
apiIDS := strings.TrimSpace(c.FormValue("api_ids"))
if apiIDS == "" {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamEmptyC,
Message: g.ParamEmptyE,
})
}
var apiIds []string
err := json.Unmarshal([]byte(apiIDS), &apiIds)
if err != nil {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamInvalidC,
Message: g.ParamInvalidE,
})
}
//必须是该service的管理员或者创建者
if !m.canOperate(talent.FormValue(c, "app_priv")) {
return c.JSON(http.StatusForbidden, g.Result{
Status: http.StatusForbidden,
ErrCode: g.ForbiddenC,
Message: g.ForbiddenE,
})
}
for _, apiID := range apiIds {
// 查询最新的api定义
api := misc.API{}
query := fmt.Sprintf("select * from api_define where api_id='%s'", apiID)
err := g.DB.Get(&api, query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
// 更新release
query = fmt.Sprintf("update api_release set description='%s',route_type='%d',route_addr='%s',route_proto='%d',mock_data='%s',retry_strategy='%d',bw_strategy='%d',traffic_on='%d',traffic_api='%s',traffic_ratio='%d',traffic_ips='%s',verify_on='%d',param_rules='%s', cached_time='%d',status='%d',label='%s' where api_id='%s'",
*api.Desc, api.RouteType, api.RouteAddr, api.RouteProto, *api.MockData, api.RetryStrategy, api.BwStrategy, api.TrafficOn, api.TrafficAPI, api.TrafficRatio, api.TrafficIPs, api.VerifyOn, *api.ParamTable, api.CachedTime, misc.API_RELEASED, api.Label, api.APIID)
_, err = g.DB.Exec(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
query = fmt.Sprintf("update api_define set release_version='%s' where api_id='%s'", api.ReviseVersion, api.APIID)
_, err = g.DB.Exec(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
userID := c.FormValue("username")
if api.ParamTable != nil {
d, _ := g.B64.DecodeString(*api.ParamTable)
d1 := talent.Bytes2String(d)
api.ParamTable = &d1
}
b, _ := json.Marshal(api)
audit.Log(userID, api.Service, audit.TypeApi, api.APIID, audit.OpRelease, talent.Bytes2String(b), "")
}
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
})
}

@ -0,0 +1,136 @@
package audit
import (
"fmt"
"net/http"
"strconv"
"github.com/mafanr/g"
"github.com/sunface/talent"
"github.com/labstack/echo"
"go.uber.org/zap"
)
const (
TypeService = 1
TypeApi = 2
TypeStrategy = 3
TypePrivilegy = 4
TypeBatch = 5
OpCreate = 1
OpEdit = 2
OpRelease = 3
OpOffline = 4
OpDelete = 5
)
func Log(userID string, service string, targetType int, targetID string, opType int, content string, desc string) {
newc := g.B64.EncodeToString(talent.String2Bytes(content))
query := fmt.Sprintf("insert into audit_log (user_id,service,target_type,target_id,op_type,content,description) values ('%s','%s','%d','%s','%d','%s','%s')",
userID, service, targetType, targetID, opType, newc, desc)
_, err := g.DB.Exec(query)
if err != nil {
g.L.Info("record audit log error", zap.Error(err), zap.String("query", query))
}
}
func Count(c echo.Context) error {
tt := c.FormValue("target_type")
tid := c.FormValue("target_id")
if tt == "" || tid == "" {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamEmptyC,
Message: g.ParamEmptyE,
})
}
var query string
if tt == "0" {
query = fmt.Sprintf("select count(1) from audit_log where service in (%s)", tid)
} else {
query = fmt.Sprintf("select count(1) from audit_log where target_id='%s' and target_type='%s'", tid, tt)
}
rows, err := g.DB.Query(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
var total int
rows.Next()
rows.Scan(&total)
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
Data: total,
})
}
type AuditLog struct {
ID int `db:"id" json:"-"`
UserID string `db:"user_id" json:"user_id"`
Service string `db:"service" json:"service"`
TargetType string `db:"target_type" json:"target_type"`
TargetID string `db:"target_id" json:"target_id"`
OpType string `db:"op_type" json:"op_type"`
Content string `db:"content" json:"content"`
Desc string `db:"description" json:"desc"`
ModifyDate string `db:"modify_date" json:"modify_date"`
}
func Load(c echo.Context) error {
tt := c.FormValue("target_type")
tid := c.FormValue("target_id")
pageS := c.FormValue("page")
if tt == "" || tid == "" || pageS == "" {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamEmptyC,
Message: g.ParamEmptyE,
})
}
page, _ := strconv.Atoi(pageS)
if page <= 0 {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamInvalidC,
Message: g.ParamInvalidE,
})
}
rs := make([]AuditLog, 0)
var query string
if tt == "0" {
query = fmt.Sprintf("select * from audit_log where service in (%s) order by modify_date desc limit %d offset %d", tid, g.PER_PAGE, g.PER_PAGE*(page-1))
} else {
query = fmt.Sprintf("select * from audit_log where target_id='%s' and target_type='%s' order by modify_date desc limit %d offset %d", tid, tt, g.PER_PAGE, g.PER_PAGE*(page-1))
}
err := g.DB.Select(&rs, query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
for i, l := range rs {
b, _ := g.B64.DecodeString(l.Content)
rs[i].Content = talent.Bytes2String(b)
}
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
Data: rs,
})
}

@ -0,0 +1,68 @@
package manage
const (
CodeInvalidUser = 10001
ErrInvalidUser = "Username invalid"
InvalidVerifyTableC = 10002
InvalidVerifyTableE = "Param verify rule is not a legal regexp"
InvalidVerifyRuleC = 10003
InvalidVerifyRuleE = "The test data is not satisfying the verify rule"
ApiWithServicePrefixC = 10004
ApiWithServicePrefixE = "API name must be prefix with service name"
ApiWithServiceSuffixC = 10014
ApiWithServiceSuffixE = "API ID must suffix with version"
ServiceEmptyC = 10005
ServiceEmptyE = "Service name cant be empty"
ApiOnlyAlphaNumAndDotC = 10006
ApiOnlyAlphaNumAndDotE = "API name can only be consisted of alphabet and numberic"
RouteAddrWithHTTPPrefixC = 10007
RouteAddrWithHTTPPrefixE = "Backend url must prefix with http:// or https://"
RouteAddrEmptyC = 10008
RouteAddrEmptyE = "Backend url cant be empty"
RouteProtoInvalidC = 10009
RouteProtoInvalidE = "Backend type invalid"
ReqTimeoutInvalidC = 10010
ReqTimeoutInvalidE = "Timeout must be in (0,60]"
RetryTimesInvalidC = 10011
RetryTimesInvalidE = "Retry times must be in (0,5]"
RetryIntvInvalidC = 10012
RetryIntvInvalidE = "Retry interval must be in (0,30]"
TrafficRatioInvalidC = 10013
TrafficRatioInvalidE = "Traffic ratio must be in [0,100]"
ApiPathTypeInvalidC = 10014
ApiPathTypeInvalidE = "API URL type must be 0 or 1"
ApiNotExistE = "Api not exist"
ApiNotExistC = 10015
ApiOnlyAlphaNumAndUriC = 10016
ApiOnlyAlphaNumAndUriE = "API name can only be consisted of alphabet,numberic and /"
ApiReservePathC = 10017
ApiReservePathE = "You cant use the reserverd name"
APIOfflineE = "API not released"
APIOfflineC = 1058
ApiStillReleasedE = "API still being released"
ApiStillReleasedC = 1059
ApiInactiveNotLongEnoughE = "You cant delete api until 30 seconds after offline"
ApiInactiveNotLongEnoughC = 1060
StrategyNameExistE = "Strategy name already exist"
StrategyNameExistc = 1061
)

@ -0,0 +1,80 @@
package manage
import (
"fmt"
"net/http"
"strings"
"github.com/mafanr/g"
"github.com/labstack/echo"
"go.uber.org/zap"
)
func (m *Manage) QueryLabels(c echo.Context) error {
service := c.FormValue("service")
if service == "" {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamEmptyC,
Message: g.ParamEmptyE,
})
}
query := fmt.Sprintf("select name from labels where service='%s'", service)
rows, err := g.DB.Query(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
labels := make([]string, 0)
for rows.Next() {
var l string
rows.Scan(&l)
labels = append(labels, l)
}
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
Data: labels,
})
}
func (m *Manage) CreateLabel(c echo.Context) error {
service := c.FormValue("service")
name := c.FormValue("name")
if service == "" || name == "" {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamEmptyC,
Message: g.ParamEmptyE,
})
}
query := fmt.Sprintf("insert into labels (service,name) values ('%s','%s')", service, name)
_, err := g.DB.Exec(query)
if err != nil {
if strings.Contains(err.Error(), g.DUP_KEY_ERR) {
return c.JSON(http.StatusConflict, g.Result{
Status: http.StatusConflict,
ErrCode: g.AlreadyExistC,
Message: g.AlreadyExistE,
})
}
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
})
}

@ -0,0 +1,149 @@
package manage
import (
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/mafanr/juz/api/manage/audit"
"github.com/mafanr/juz/api/manage/strategy"
"github.com/mafanr/juz/misc"
"github.com/mafanr/g"
"go.uber.org/zap"
"github.com/labstack/echo"
"github.com/sunface/talent"
)
type Manage struct{}
func (m *Manage) Start() {
registerToEtcd()
e := echo.New()
//api管理
e.POST("/manage/api/query", m.QueryAPI, auth)
e.POST("/manage/api/count", m.CountAPI, auth)
e.POST("/manage/api/define", m.DefineAPI, auth)
e.POST("/manage/api/delete", m.DeleteAPI, auth)
e.POST("/manage/api/verifyParamRule", m.VerifyParamRule, auth)
e.POST("/manage/api/release", m.APIRelease, auth)
e.POST("/manage/api/batchRelease", m.APIBatchRelease, auth)
e.POST("/manage/api/offline", m.APIOffline, auth)
e.POST("/manage/api/batchStrategy", m.APIBatchStrategy, auth)
e.POST("/manage/api/batchDelStrategy", m.APIBatchDelStrategy, auth)
//策略管理
e.POST("/manage/strategy/create", strategy.Create, auth)
e.POST("/manage/strategy/update", strategy.Update, auth)
e.POST("/manage/strategy/load", strategy.Load, auth)
e.POST("/manage/strategy/change", strategy.Change, auth)
e.POST("/manage/strategy/delete", strategy.Delete, auth)
e.POST("/manage/strategy/query", strategy.Query, auth)
// e.POST("/manage/strategy/api", strategy.Api, auth)
// 审计日志
e.POST("/manage/auditLog/count", audit.Count, auth)
e.POST("/manage/auditLog/load", audit.Load, auth)
// 标签分组
e.POST("/manage/labels/query", m.QueryLabels, auth)
e.POST("/manage/labels/create", m.CreateLabel, auth)
e.Logger.Fatal(e.Start(":" + misc.Conf.Manage.Port))
}
func auth(f echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
if c.FormValue("admin_token") != misc.Conf.Common.AdminToken {
return c.JSON(http.StatusUnauthorized, g.Result{
Status: http.StatusUnauthorized,
ErrCode: g.ForbiddenC,
Message: g.ForbiddenE,
})
}
return f(c)
}
}
func registerToEtcd() {
g.EtcdCli = g.InitEtcd(misc.Conf.Etcd.Addrs)
// 保存服务状态到etcd
ip := talent.LocalIP()
fmt.Println("local ip:", ip)
host := ip + ":" + misc.Conf.Manage.Port
go func() {
for {
err := g.StoreServer(g.EtcdCli, &g.ServerInfo{g.TFEManage, host, 0})
if err != nil {
g.L.Error("Store to etcd error", zap.Error(err))
}
time.Sleep(time.Second * g.ServiceStoreInterval)
}
}()
}
func validUserID(s string) bool {
i, err := strconv.Atoi(s)
if err != nil {
return false
}
if i == 0 {
return false
}
return true
}
func (m *Manage) serviceExist(service string) bool {
// 验证service是否存在
var temp interface{}
query := fmt.Sprintf("select id from service where name ='%s'", service)
err := g.DB.Get(&temp, query)
if err != nil {
return false
}
return true
}
func (m *Manage) canView(priv string) bool {
if priv == g.PRIV_GUEST {
return false
}
return true
}
func (m *Manage) canOperate(priv string) bool {
if priv == g.PRIV_ADMIN || priv == g.PRIV_OWNER {
return true
}
return false
}
func isServiceCreator(userID string, service string) bool {
// 验证是否是service创建者
var temp interface{}
query := fmt.Sprintf("select id from service where name ='%s' and creator='%s'", service, userID)
err := g.DB.Get(&temp, query)
if err == nil {
// 是创建者
return true
}
return false
}
func getServiceByApiName(apiName string) string {
return strings.Split(apiName, ".")[0]
}

@ -0,0 +1,340 @@
package strategy
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/mafanr/juz/api/manage/audit"
"github.com/mafanr/juz/misc"
"github.com/mafanr/g"
"github.com/labstack/echo"
"github.com/sunface/talent"
"go.uber.org/zap"
)
func parse(c echo.Context) (*misc.Strategy, int, string) {
str := c.FormValue("strategy")
if str == "" {
return nil, g.ParamEmptyC, g.ParamEmptyE
}
st := &misc.Strategy{}
err := json.Unmarshal([]byte(str), &st)
if err != nil {
return nil, g.ParamInvalidC, g.ParamInvalidE
}
if st.Name == "" || st.Service == "" {
return nil, g.ParamInvalidC, g.ParamInvalidE
}
return st, 0, ""
}
func Create(c echo.Context) error {
st, ecode, emsg := parse(c)
if ecode != 0 {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: ecode,
Message: emsg,
})
}
if !canOperate(c, st.Service) {
return c.JSON(http.StatusForbidden, g.Result{
Status: http.StatusForbidden,
ErrCode: g.ForbiddenC,
Message: g.ForbiddenE,
})
}
query := fmt.Sprintf("insert into strategy (name,service,type,sub_type,content,create_date) values ('%s','%s','%d','%d','%s','%s')",
st.Name, st.Service, st.Type, st.SubType, st.Content, talent.Time2StringSecond(time.Now()))
res, err := g.DB.Exec(query)
if err != nil {
if strings.Contains(err.Error(), g.DUP_KEY_ERR) {
return c.JSON(http.StatusConflict, g.Result{
Status: http.StatusConflict,
ErrCode: g.AlreadyExistC,
Message: "Strategy name already exist",
})
}
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
id, _ := res.LastInsertId()
audit.Log(c.FormValue("username"), st.Service, audit.TypeStrategy, fmt.Sprintf("%d:%s", id, st.Name), audit.OpCreate, c.FormValue("strategy"), "")
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
})
}
func Update(c echo.Context) error {
st, ecode, emsg := parse(c)
if ecode != 0 {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: ecode,
Message: emsg,
})
}
if !canOperate(c, st.Service) {
return c.JSON(http.StatusForbidden, g.Result{
Status: http.StatusForbidden,
ErrCode: g.ForbiddenC,
Message: g.ForbiddenE,
})
}
query := fmt.Sprintf("update strategy set name='%s',sub_type='%d',content='%s' where id ='%d'",
st.Name, st.SubType, st.Content, st.ID)
_, err := g.DB.Exec(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
audit.Log(c.FormValue("username"), st.Service, audit.TypeStrategy, fmt.Sprintf("%d:%s", st.ID, st.Name), audit.OpEdit, c.FormValue("strategy"), "")
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
})
}
func Load(c echo.Context) error {
service := c.FormValue("service")
tp := c.FormValue("type")
if service == "" {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamEmptyC,
Message: g.ParamEmptyE,
})
}
ss := make([]*misc.Strategy, 0)
var query string
if tp == "0" {
query = fmt.Sprintf("select * from strategy where service ='%s' order by modify_date desc", service)
} else {
query = fmt.Sprintf("select * from strategy where service ='%s' and type='%s' order by modify_date desc", service, tp)
}
err := g.DB.Select(&ss, query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
Data: ss,
})
}
func Delete(c echo.Context) error {
service := c.FormValue("service")
id := c.FormValue("id")
name := c.FormValue("name")
tp, _ := strconv.Atoi(c.FormValue("type"))
if service == "" || id == "" {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamEmptyC,
Message: g.ParamEmptyE,
})
}
if !canOperate(c, service) {
return c.JSON(http.StatusForbidden, g.Result{
Status: http.StatusForbidden,
ErrCode: g.ForbiddenC,
Message: g.ForbiddenE,
})
}
var query string
switch tp {
case misc.STRATEGY_BWLIST:
query = fmt.Sprintf("update api_define set bw_strategy='%d' where bw_strategy='%s'", 0, id)
case misc.STRATEGY_RETRY:
query = fmt.Sprintf("update api_define set retry_strategy='%d' where retry_strategy='%s'", 0, id)
case misc.STRATEGY_TRAFFIC:
query = fmt.Sprintf("update api_define set traffic_strategy='%d' where traffic_strategy='%s'", 0, id)
default:
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamInvalidC,
Message: g.ParamInvalidE,
})
}
_, err := g.DB.Exec(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
switch tp {
case misc.STRATEGY_BWLIST:
query = fmt.Sprintf("update api_release set bw_strategy='%d' where bw_strategy='%s'", 0, id)
case misc.STRATEGY_RETRY:
query = fmt.Sprintf("update api_release set retry_strategy='%d' where retry_strategy='%s'", 0, id)
case misc.STRATEGY_TRAFFIC:
query = fmt.Sprintf("update api_release set traffic_strategy='%d' where traffic_strategy='%s'", 0, id)
}
_, err = g.DB.Exec(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
query = fmt.Sprintf("delete from strategy where id='%s'", id)
_, err = g.DB.Exec(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
audit.Log(c.FormValue("username"), service, audit.TypeStrategy, fmt.Sprintf("%s:%s", id, name), audit.OpDelete, "", "")
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
})
}
func Change(c echo.Context) error {
status, _ := strconv.Atoi(c.FormValue("status"))
id := c.FormValue("id")
if id == "" {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamEmptyC,
Message: g.ParamEmptyE,
})
}
newS := 0
op := 0
switch status {
case misc.STRATEGY_ON:
newS = misc.STRATEGY_OFF
op = audit.OpOffline
case misc.STRATEGY_OFF:
newS = misc.STRATEGY_ON
op = audit.OpRelease
default:
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamInvalidC,
Message: g.ParamInvalidE,
})
}
query := fmt.Sprintf("update strategy set status='%d' where id='%s'", newS, id)
_, err := g.DB.Exec(query)
if err != nil {
g.L.Info("access database error", zap.Error(err), zap.String("query", query))
return c.JSON(http.StatusInternalServerError, g.Result{
Status: http.StatusInternalServerError,
ErrCode: g.DatabaseC,
Message: g.DatabaseE,
})
}
// 查询发布的strategy内容
s := misc.Strategy{}
g.DB.Get(&s, fmt.Sprintf("select * from strategy where id='%s'", id))
d, _ := json.Marshal(s)
audit.Log(c.FormValue("username"), s.Service, audit.TypeStrategy, fmt.Sprintf("%s:%s", id, s.Name), op, talent.Bytes2String(d), "")
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
})
}
func Query(c echo.Context) error {
id := c.FormValue("id")
if id == "" {
return c.JSON(http.StatusBadRequest, g.Result{
Status: http.StatusBadRequest,
ErrCode: g.ParamEmptyC,
Message: g.ParamEmptyE,
})
}
s := misc.Strategy{}
g.DB.Get(&s, fmt.Sprintf("select * from strategy where id='%s'", id))
return c.JSON(http.StatusOK, g.Result{
Status: http.StatusOK,
Data: s,
})
}
func canOperate(c echo.Context, service string) bool {
role := c.FormValue("app_priv")
userID := c.FormValue("username")
if role == g.ROLE_NORMAL {
// 验证是否是service创建者
var temp interface{}
query := fmt.Sprintf("select id from service where name ='%s' and creator='%s'", service, userID)
err := g.DB.Get(&temp, query)
if err == nil {
// 是创建者
return true
}
// 验证是否是管理员
query = fmt.Sprintf("select privilege from privilege where user_id='%s' and service='%s'", userID, service)
rows, err := g.DB.Query(query)
if !rows.Next() {
// 不存在该用户的权限
return false
}
var priv int
rows.Scan(&priv)
if priv == misc.PRIVILEGE_ADMIN {
// 是管理员
return true
}
return false
} else { //是应用级别的管理员
return true
}
}

@ -0,0 +1,144 @@
package req
import (
"errors"
"fmt"
"net/http"
"github.com/mafanr/juz/misc"
"github.com/mafanr/g"
"github.com/labstack/echo"
)
type Request struct {
Rid int64
Method string
Params map[string]string
Cookies []*http.Cookie
Api *misc.API
DebugOn bool
ClientIP string
BwStrategy *misc.BwStrategy
RetryStrategy *misc.RetryStrategy
TrafficStrategy *misc.TrafficStrategy
}
func (r *Request) String() string {
return fmt.Sprintf("method: %s,params: %v, api: %v, client_ip: %s", r.Method, r.Params, *r.Api, r.ClientIP)
}
func Parse(c echo.Context) (*Request, error) {
r := &Request{
Rid: c.Get("rid").(int64),
Params: make(map[string]string),
}
// 解析参数
ps, _ := c.FormParams()
for k, v := range ps {
// debug参数不透传
if k == g.DEBUG_PARAM {
if v[0] == "true" {
r.DebugOn = true
}
} else {
r.Params[k] = v[0]
}
}
// 解析cookie
r.Cookies = c.Cookies()
r.ClientIP = c.RealIP()
r.Method = c.Request().Method
var apiName string
var version string
// 判断是路径映射还是api映射
uri := c.Request().URL.Path
if uri == "/service/api" {
apiName = r.Params["service_id"]
if apiName == "" {
//新网关使用以下参数'api_name'
apiName = r.Params["api_name"]
if apiName == "" {
return r, errors.New("api_name not founded")
}
}
} else {
apiName = uri
}
version = r.Params["api_version"]
if version == "" {
// 如果没有传version就默认为版本1
version = "1"
}
apiID := apiName + ".v" + version
// 获取api信息
apiI, ok := misc.Apis.Load(apiID)
if !ok {
return r, errors.New("api id not exist")
}
r.Api = apiI.(*misc.API)
// 生成策略
strategy(r)
return r, nil
}
func strategy(r *Request) {
// 设置策略停用时的默认值
r.BwStrategy = &misc.BwStrategy{
Type: 0,
}
if r.Api.BwStrategy != 0 {
s1, ok := misc.Strategies.Load(r.Api.BwStrategy)
if ok {
s := s1.(*misc.Strategy)
if s.Status == misc.STRATEGY_ON {
r.BwStrategy = s.DetailContent.(*misc.BwStrategy)
}
}
}
// 设置策略停用时的默认值
r.RetryStrategy = &misc.RetryStrategy{
ReqTimeout: misc.REQ_TIMEOUT,
RetryTimes: misc.RETRY_TIMES,
RetryInterval: misc.RETRY_INTERVAL,
}
if r.Api.RetryStrategy != 0 {
s1, ok := misc.Strategies.Load(r.Api.RetryStrategy)
if ok {
s := s1.(*misc.Strategy)
if s.Status == misc.STRATEGY_ON {
r.RetryStrategy = s.DetailContent.(*misc.RetryStrategy)
}
}
}
// 设置策略停用时的默认值
r.TrafficStrategy = &misc.TrafficStrategy{
QPS: misc.STRATEGY_NO_LIMIT,
Concurrent: misc.STRATEGY_NO_LIMIT,
Param: "",
}
if r.Api.TrafficStrategy != 0 {
s1, ok := misc.Strategies.Load(r.Api.TrafficStrategy)
if ok {
s := s1.(*misc.Strategy)
if s.Status == misc.STRATEGY_ON {
r.TrafficStrategy = s.DetailContent.(*misc.TrafficStrategy)
}
}
}
}

@ -0,0 +1,170 @@
package api
import (
"errors"
"net/http"
"strconv"
"time"
"github.com/mafanr/juz/api/filter"
"github.com/mafanr/juz/api/manage"
"github.com/mafanr/juz/api/req"
"github.com/mafanr/juz/api/stats"
"github.com/mafanr/juz/misc"
"github.com/mafanr/g"
"go.uber.org/zap"
"github.com/prometheus/client_golang/prometheus"
"github.com/sunface/talent"
"github.com/labstack/echo"
"github.com/valyala/fasthttp"
)
/* 请求路由模块,所有进入的请求都在这里被处理和路由*/
const (
SYNC = 1
REDIRECT = 2
ASYNC = 9
)
type router struct {
apiServer *ApiServer
*filter.Filter
}
/*----------------------请求路由---------------------*/
func (router *router) route(c echo.Context) error {
// 解析请求
r, err := req.Parse(c)
if err != nil {
c.Set("api_id", "error_api_id")
c.Set("service", "error_service")
c.Set("label", "error_label")
c.Set("error_msg", err)
return c.JSON(http.StatusBadRequest, g.Result{r.Rid, http.StatusBadRequest, g.ParamInvalidC, err.Error(), nil})
}
c.Set("api_id", r.Api.APIID)
c.Set("service", r.Api.Service)
c.Set("label", r.Api.Label)
g.Debug(r.DebugOn, misc.Conf.Common.LogLevel, "request content", zap.Int64("rid", r.Rid), zap.String("req", r.String()))
// 判断api是否发布
if r.Api.Status != misc.API_RELEASED {
return c.JSON(http.StatusBadRequest, g.Result{r.Rid, http.StatusBadRequest, manage.APIOfflineC, manage.APIOfflineE, nil})
}
// 在请求路由之前进行过滤
res := router.BeforeRoute(r)
if res.Status != 0 {
c.Set("error_msg", errors.New(res.Emsg))
return c.JSON(res.Status, g.Result{r.Rid, res.Status, res.Ecode, res.Emsg, nil})
}
// 开始请求
var code int
var body []byte
switch r.Api.RouteType {
case SYNC: // 同步请求
code, body, err = router.sync(r)
case REDIRECT: //重定向
return router.redirect(c, r)
}
// 请求失败,通知客户端
if err != nil {
c.Set("error_msg", err)
return c.JSON(code, g.Result{r.Rid, code, g.ReqFailedC, err.Error(), nil})
}
g.Debug(r.DebugOn, misc.Conf.Common.LogLevel, "response body", zap.Int64("rid", r.Rid), zap.Int("code", code), zap.String("body", talent.Bytes2String(body)))
// 成功时把请求id放在header中返回避免污染返回结果
c.Response().Header().Add("rid", strconv.FormatInt(r.Rid, 10))
// 返回给客户端成功的结果
return c.String(code, talent.Bytes2String(body))
}
func (rt *router) redirect(c echo.Context, r *req.Request) error {
// 组装参数
url := r.Api.RouteAddr + "?" + c.QueryString()
return c.Redirect(http.StatusMovedPermanently, url)
}
func (rt *router) sync(r *req.Request) (int, []byte, error) {
args := &fasthttp.Args{}
for k, v := range r.Params {
args.Set(k, v)
}
req := &fasthttp.Request{}
resp := &fasthttp.Response{}
// 放入自定义cookie信息
for _, ck := range r.Cookies {
req.Header.SetCookie(ck.Name, ck.Value)
}
// 请求头部加入request id方便后续业务进行跟踪
req.Header.Set("rid", strconv.FormatInt(r.Rid, 10))
req.Header.SetMethod(r.Method)
// 写入客户端真实ip
req.Header.Set("X-Forwarded-For", r.ClientIP)
url := r.Api.RouteAddr
switch r.Method {
case "GET":
// 拼接url
url = url + "?" + args.String()
default:
args.WriteTo(req.BodyWriter())
}
req.SetRequestURI(url)
// 超时重试
retrys := 0
var err error
for {
res := rt.Filter.BeforeCall(r)
if res.Status != 0 {
return res.Status, nil, errors.New(res.Emsg)
}
err = g.Cli.DoTimeout(req, resp, time.Duration(r.RetryStrategy.ReqTimeout)*time.Second)
// time.Sleep(10 * time.Second)
rt.Filter.AfterCall(r)
if err == nil {
// 统计请求code
stats.Codes.With(prometheus.Labels{
"code": strconv.Itoa(resp.StatusCode()),
"api_id": r.Api.APIID,
"service": r.Api.Service,
"label": r.Api.Label,
}).Inc()
break
}
// 统计请求错误
stats.Errors.With(prometheus.Labels{
"api_id": r.Api.APIID,
"service": r.Api.Service,
"label": r.Api.Label,
}).Inc()
// 发生错误,进行重试
if retrys >= r.RetryStrategy.RetryTimes {
break
}
time.Sleep(time.Duration(r.RetryStrategy.RetryInterval) * time.Second)
retrys++
}
return resp.StatusCode(), resp.Body(), err
}

Binary file not shown.

@ -0,0 +1,115 @@
create database if not exists mafanr_juz;
USE mafanr_juz;
CREATE TABLE IF NOT EXISTS `api_release` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
`api_id` varchar(255) NOT NULL COMMENT 'API ID',
`path_type` int(11) DEFAULT '0' COMMENT '是否是路径映射类型0代表否1代表是',
`service` varchar(255) NOT NULL COMMENT 'service名',
`description` text COMMENT '介绍',
`route_type` int(11) NOT NULL DEFAULT '1' COMMENT '代理类型',
`route_addr` varchar(255) NOT NULL COMMENT '后段服务地址',
`route_proto` int(11) DEFAULT '1' COMMENT '后端服务协议',
`mock_data` text COMMENT 'mock类型接口返回定义的mock数据',
`retry_strategy` int(11) DEFAULT '0' COMMENT '重试策略ID',
`bw_strategy` int(11) DEFAULT '0' COMMENT '黑白名单策略ID',
`traffic_strategy` int(11) DEFAULT '0' COMMENT '流量控制策略ID',
`traffic_on` int(11) DEFAULT '0' COMMENT '是否开启流量路由',
`traffic_api` varchar(255) DEFAULT '' COMMENT '指定部分流量路由到该api name',
`traffic_ratio` int(11) DEFAULT '0' COMMENT '被路由的流量占比, 0<=x<=100',
`traffic_ips` varchar(255) DEFAULT '' COMMENT '指定来自哪些ip的流量被路由',
`verify_on` int(11) DEFAULT '0' COMMENT '是否开启参数验证',
`param_rules` longtext COMMENT '参数验证表',
`cached_time` int(11) DEFAULT '0' COMMENT '为0表示不开启,其它值代表缓存的时间',
`label` varchar(255) DEFAUlT '' COMMENT '标签分组',
`status` int(11) DEFAULT '1' COMMENT '发布状态0代表未发布,1代表已发布',
`create_date` datetime DEFAULT NULL COMMENT '创建时间',
`modify_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `Index_api_apiid` (`api_id`) USING BTREE,
KEY `Index_api_modifydate` (`modify_date`) USING BTREE,
KEY `Index_api_status` (`status`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT COMMENT 'api发布表';
CREATE TABLE IF NOT EXISTS `api_define` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
`api_id` varchar(255) NOT NULL COMMENT 'API ID',
`path_type` int(11) DEFAULT '0' COMMENT '是否是路径映射类型0代表否1代表是',
`service` varchar(255) NOT NULL COMMENT 'service名',
`description` text COMMENT '介绍',
`route_type` int(11) NOT NULL DEFAULT '1' COMMENT '代理类型',
`route_addr` varchar(255) NOT NULL COMMENT '后段服务地址',
`route_proto` int(11) DEFAULT '1' COMMENT '后端服务协议',
`mock_data` text COMMENT 'mock类型接口返回定义的mock数据',
`retry_strategy` int(11) DEFAULT '0' COMMENT '重试策略ID',
`bw_strategy` int(11) DEFAULT '0' COMMENT '黑白名单策略ID',
`traffic_strategy` int(11) DEFAULT '0' COMMENT '流量控制策略ID',
`traffic_on` int(11) DEFAULT '0' COMMENT '是否开启流量路由',
`traffic_api` varchar(255) DEFAULT '' COMMENT '指定部分流量路由到该api name',
`traffic_ratio` int(11) DEFAULT '0' COMMENT '被路由的流量占比, 0<=x<=100',
`traffic_ips` varchar(255) DEFAULT '' COMMENT '指定来自哪些ip的流量被路由',
`verify_on` int(11) DEFAULT '0' COMMENT '是否开启参数验证',
`param_rules` longtext COMMENT '参数验证表',
`cached_time` int(11) DEFAULT '0' COMMENT '为0表示不开启,其它值代表缓存的时间',
`label` varchar(255) DEFAUlT '' COMMENT '标签分组',
`revise_version` varchar(255) NOT NULL COMMENT 'api内容最新版本号',
`release_version` varchar(255) DEFAULT '' COMMENT '当前已发布的版本号',
`create_date` datetime DEFAULT NULL COMMENT '创建时间',
`modify_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `Index_api_apiid` (`api_id`) USING BTREE,
KEY `Index_strategy_modifydate` (`modify_date`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT COMMENT 'api_define';
CREATE TABLE IF NOT EXISTS `strategy` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`name` varchar(255) DEFAULT '' COMMENT 'strategy名',
`service` varchar(255) DEFAULT '' COMMENT 'service名',
`type` int(11) DEFAULT NULL COMMENT '策略类型,1黑白名单2超时重试等等',
`sub_type` int(11) DEFAULT NULL COMMENT '策略的子类型,一般是和内容相关的类型,例如黑白名单中的黑或者白',
`content` longtext COMMENT '策略的具体内容',
`status` int(11) DEFAULT '1' COMMENT '1开启, 0停用',
`create_date` datetime DEFAULT NULL,
`modify_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录更新时间',
PRIMARY KEY (`id`),
KEY `Index_strategy_service` (`service`) USING BTREE,
UNIQUE KEY `Index_api_strategy_servicename` (`service`,`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT '通用策略表';
CREATE TABLE IF NOT EXISTS `audit_log` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`user_id` varchar(255) DEFAULT '' COMMENT 'strategy名',
`service` varchar(255) DEFAULT '' COMMENT 'service名',
`target_type` int(11) DEFAULT '0' COMMENT '目标类型1:service 2: api 3: 策略',
`target_id` varchar(255) DEFAUlT '' COMMENT '操作的目标id/name',
`op_type` int(11) DEFAULT '0' COMMENT '操作类型:1. 创建 2. 更新 3. 管理 4. 删除',
`content` TEXT COMMENT '具体内容',
`description` TEXT COMMENT '操作备注',
`modify_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录更新时间',
PRIMARY KEY (`id`),
KEY `Index_audit_log_service` (`service`) USING BTREE,
KEY `Index_audit_log_tid` (`target_id`) USING BTREE,
KEY `Index_audit_log_type` (`target_type`) USING BTREE,
KEY `Index_audit_log_modifydate` (`modify_date`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT '审计日志表';

@ -0,0 +1,40 @@
package stats
import (
"github.com/prometheus/client_golang/prometheus"
)
//ApiID指标项: 请求数、http code分布、错误统计、耗时统计、QPS
//Label指标项继承ApiID请求数top n,错误数top n耗时top n, qps top n
//Service指标项: 继承Label
var (
Req = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "req_distribution",
Help: "Request stats",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}, []string{"api_id", "service", "label"})
Limits = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "req_limits",
Help: "Request blocked",
}, []string{"api_id", "service", "label"})
Errors = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "req_errors",
Help: "Request error",
}, []string{"api_id", "service", "label"})
Codes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "req_codes",
Help: "Reqeust http code",
},
[]string{"code", "api_id", "service", "label"},
)
)
func init() {
// Metrics have to be registered to be exposed:
prometheus.MustRegister(Req)
prometheus.MustRegister(Limits)
prometheus.MustRegister(Errors)
prometheus.MustRegister(Codes)
}

@ -0,0 +1 @@
# 集成测试

@ -0,0 +1,34 @@
package main
import (
"flag"
"log"
"runtime"
"github.com/valyala/fasthttp"
)
func main() {
runtime.GOMAXPROCS(1)
flag.Parse()
h := requestHandler
if err := fasthttp.ListenAndServe("localhost:10001", h); err != nil {
log.Fatalf("Error in ListenAndServe: %s", err)
}
}
func requestHandler(ctx *fasthttp.RequestCtx) {
ctx.SetContentType("text/plain; charset=utf8")
// Set arbitrary headers
ctx.Response.Header.Set("X-My-Header", "my-header-value")
// Set cookies
var c fasthttp.Cookie
c.SetKey("cookie-name")
c.SetValue("cookie-value")
ctx.Response.Header.SetCookie(&c)
}

@ -0,0 +1,16 @@
一、代理
0.通用规则
1. 除非请求业务方返回200否则tfe不可以返回200 code
1.同步代理
1对GET/POST/PUT/DELETE进行测试Method需要透传
2对参数service_id为空、不存在的错误进行验证
3被请求方要能看到最初的客户端IP通过X-Forwarded-For透传
2.重定向
1) 301
2) 目前只有Get请求的参数会透传
二、数据加载
1. 启动时数据全量加载
2. 更新时,重新加载某一行

@ -0,0 +1,71 @@
// Copyright © 2018 NAME HERE <EMAIL ADDRESS>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cmd
import (
"os"
"os/signal"
"strings"
"syscall"
"github.com/mafanr/juz/misc"
"github.com/mafanr/juz/api"
"github.com/mafanr/g"
"github.com/spf13/cobra"
"go.uber.org/zap"
)
// apiCmd represents the api command
var apiCmd = &cobra.Command{
Use: "api",
Short: "api gateway",
Long: ``,
// Uncomment the following line if your bare application
// has an action associated with it:
Run: func(cmd *cobra.Command, args []string) {
misc.InitConfig("juz.conf")
misc.Conf.Common.LogLevel = strings.ToLower(misc.Conf.Common.LogLevel)
g.InitLogger()
g.L.Info("Application version", zap.String("version", misc.Conf.Common.Version))
p := &api.ApiServer{}
p.Start()
// 等待服务器停止信号
chSig := make(chan os.Signal)
signal.Notify(chSig, syscall.SIGINT, syscall.SIGTERM)
sig := <-chSig
g.L.Info("juz received signal", zap.Any("signal", sig))
},
}
func init() {
rootCmd.AddCommand(apiCmd)
// Here you will define your flags and configuration settings.
// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
// apiCmd.PersistentFlags().String("foo", "", "A help for foo")
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// apiCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
}

@ -0,0 +1,55 @@
// Copyright © 2018 NAME HERE <EMAIL ADDRESS>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cmd
import (
"fmt"
"os"
"github.com/spf13/cobra"
)
var cfgFile string
// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: "tfe",
Short: "Api网关",
Long: ``,
// Uncomment the following line if your bare application
// has an action associated with it:
Run: func(cmd *cobra.Command, args []string) {
},
}
// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
func init() {
// Here you will define your flags and configuration settings.
// Cobra supports persistent flags, which, if defined here,
// will be global for your application.
// rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.tfe.yaml)")
// Cobra also supports local flags, which will only run
// when this action is called directly.
// rootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
}

@ -0,0 +1,67 @@
// Copyright © 2018 NAME HERE <EMAIL ADDRESS>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cmd
import (
"os"
"os/signal"
"strings"
"syscall"
"github.com/mafanr/juz/misc"
"github.com/mafanr/juz/traffic"
"github.com/mafanr/g"
"github.com/spf13/cobra"
"go.uber.org/zap"
)
// trafficCmd represents the traffic command
var trafficCmd = &cobra.Command{
Use: "traffic",
Short: "traffic control center",
Long: ``,
Run: func(cmd *cobra.Command, args []string) {
misc.InitConfig("juz.conf")
misc.Conf.Common.LogLevel = strings.ToLower(misc.Conf.Common.LogLevel)
g.InitLogger()
g.L.Info("Application version", zap.String("version", misc.Conf.Common.Version))
p := &traffic.Traffic{}
p.Start()
// 等待服务器停止信号
chSig := make(chan os.Signal)
signal.Notify(chSig, syscall.SIGINT, syscall.SIGTERM)
sig := <-chSig
g.L.Info("juz received signal", zap.Any("signal", sig))
},
}
func init() {
rootCmd.AddCommand(trafficCmd)
// Here you will define your flags and configuration settings.
// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
// trafficCmd.PersistentFlags().String("foo", "", "A help for foo")
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// trafficCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
}

@ -0,0 +1,33 @@
common:
version: 0.0.1
loglevel: warn
admintoken: "juz.io"
api:
port: "8081"
serverid: 1
manage:
port: "8089"
mysql:
# addr: 10.7.13.48
# port: 8066
# database: tfe
# acc: tfe
# pw: EUnt7sbiRzYzpRLz8S21
addr: localhost
port: 3306
database: mafanr_juz
acc: root
pw:
etcd:
addrs:
- "localhost:2379"
# - "10.7.24.191:2379"
# - "10.7.24.191:2379"
traffic:
host: "127.0.0.1"
port: "8088"

@ -0,0 +1,21 @@
// Copyright © 2018 NAME HERE <EMAIL ADDRESS>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import "github.com/mafanr/juz/cmd"
func main() {
cmd.Execute()
}

@ -0,0 +1,69 @@
// Copyright © 2018 Sunface <CTO@188.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package misc
import (
"io/ioutil"
"log"
"gopkg.in/yaml.v2"
)
type Config struct {
Common struct {
Version string
LogLevel string
AdminToken string
}
Api struct {
Port string
ServerID int64
}
Manage struct {
Port string
}
Mysql struct {
Addr string
Port string
Database string
Acc string
Pw string
}
Etcd struct {
Addrs []string
}
Traffic struct {
Host string
Port string
}
}
var Conf *Config
func InitConfig(path string) {
conf := &Config{}
data, err := ioutil.ReadFile(path)
if err != nil {
log.Fatal("read config error :", err)
}
err = yaml.Unmarshal(data, &conf)
if err != nil {
log.Fatal("yaml decode error :", err)
}
Conf = conf
}

@ -0,0 +1,58 @@
package misc
const TEST_API_NAME = "devops.test.get.v1"
// 默认请求策略
const (
REQ_TIMEOUT = 15
RETRY_TIMES = 0
RETRY_INTERVAL = 5
)
const (
PRIVILEGE_SUPER_ADMIN = 0
PRIVILEGE_ADMIN = 1
PRIVILEGE_NORMAL = 2
PRIVILEGE_VIEWER = 3
)
// 通用的
const (
ON = 1
OFF = 0
)
// 以下值绝对不可更改,前端也在使用
const (
STRATEGY_NO_LIMIT = 0
BW_OFF = 0
BLACK_LIST = 1
WHITE_LIST = 2
IP_TYPE = 1
PARAM_TYPE = 2
TRAFFIC_ON = 1
TRAFFIC_OFF = 0
PARAM_VERIFY_ON = 1
PARAM_VERIFY_OFF = 0
API_RELEASED = 1
API_OFFLINE = 0
STRATEGY_ALL = -1
STRATEGY_EMPTY = 0
STRATEGY_BWLIST = 1
STRATEGY_RETRY = 2
STRATEGY_TRAFFIC = 3
STRATEGY_ON = 1
STRATEGY_OFF = 0
)
// redis后缀
const (
TRAFFIC_CONCURRENT = ".cr"
)

@ -0,0 +1,140 @@
package misc
import (
"fmt"
"sync"
"github.com/mafanr/g"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
)
type API struct {
ID int `db:"id" json:"id"`
// 基本设置
Service string `db:"service" json:"service"`
APIID string `db:"api_id" json:"api_id"`
PathType int `db:"path_type" json:"path_type"`
Desc *string `db:"description" json:"desc"`
RouteType int `db:"route_type" json:"route_type"`
RouteAddr string `db:"route_addr" json:"route_addr"`
RouteProto int `db:"route_proto" json:"route_proto"`
MockData *string `db:"mock_data" json:"mock_data"`
// 通用策略
RetryStrategy int `db:"retry_strategy" json:"retry_strategy"`
BwStrategy int `db:"bw_strategy" json:"bw_strategy"`
TrafficStrategy int `db:"traffic_strategy" json:"traffic_strategy"`
// 流量路由
TrafficOn int `db:"traffic_on" json:"traffic_on"`
TrafficAPI string `db:"traffic_api" json:"traffic_api"`
TrafficRatio int `db:"traffic_ratio" json:"traffic_ratio"`
TrafficIPs string `db:"traffic_ips" json:"traffic_ips"`
// 参数验证
VerifyOn int `db:"verify_on" json:"verify_on"`
ParamTable *string `db:"param_rules" json:"param_rules"`
// 缓存
CachedTime int `db:"cached_time" json:"cached_time"`
// 标签分组
Label string `db:"label" json:"label"`
// API修订的版本号
ReviseVersion string `db:"revise_version" json:"revise_version"`
ReleaseVersion string `db:"release_version" json:"release_version"`
Status int `db:"status" json:"status"`
// 日期相关
CreateDate string `db:"create_date" json:"create_date"`
ModifyDate string `db:"modify_date" json:"modify_date"`
ParamRules *sync.Map
}
type Service struct {
ID int `db:"id"`
Name string `db:"name"`
Creator string `db:"creator"`
CreateDate string `db:"create_date"`
ModifyDate string `db:"modify_date"`
}
type BW struct {
Type int `json:"type"`
Key string `json:"key"`
Val string `json:"val"`
}
type ParamRule struct {
Param string `json:"param"`
ParamRule string `json:"rule"`
TestData string `json:"test_data"`
}
type Strategy struct {
ID int `db:"id" json:"id"`
Name string `db:"name" json:"name"`
Service string `db:"service" json:"service"`
Type int `db:"type" json:"type"`
SubType int `db:"sub_type" json:"sub_type"`
Content string `db:"content" json:"content"`
Status int `db:"status" json:"status"`
CreateDate string `db:"create_date" json:"create_date"`
ModifyDate string `db:"modify_date" json:"modify_date"`
DetailContent interface{} `json:"-"` // 把content翻译成具体的策略语言
}
type BwStrategy struct {
Type int // 黑 or 白
BwList []*BW
}
type RetryStrategy struct {
ReqTimeout int `json:"req_timeout"`
RetryTimes int `json:"retry_times"`
RetryInterval int `json:"retry_interval"`
}
type TrafficStrategy struct {
// 接口流量
QPS int `json:"qps"`
Concurrent int `json:"concurrent"`
// 用户流量
Param string `json:"param"` // 限制参数
Span int `json:"span"` // 限定时间
Times int `json:"times"` // 限定次数
// 熔断设置
FuseError int `json:"fuse_error"` // 熔断错误率,大于该值时,触发熔断保护
FuseErrorCount int `json:"fuse_error_count"` // 熔断触发的最小请求次数
FusePercent int `json:"fuse_percent"` // 熔断触发后允许访问的百分比例如100次请求只有50次允许通过
FuseRecover int `json:"fuse_recover"` // 熔断错误率小于该值时,取消熔断保护
FuseRecoverCount int `json:"fuse_recover_count"` // 熔断恢复的最小请求次数
}
var Apis = &sync.Map{}
var Strategies = &sync.Map{}
func InitMysql() {
var err error
// 初始化mysql连接
sqlConn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", Conf.Mysql.Acc, Conf.Mysql.Pw,
Conf.Mysql.Addr, Conf.Mysql.Port, Conf.Mysql.Database)
g.DB, err = sqlx.Open("mysql", sqlConn)
if err != nil {
g.L.Fatal("init mysql error", zap.Error(err))
}
// 测试db是否正常
err = g.DB.Ping()
if err != nil {
g.L.Fatal("init mysql, ping error", zap.Error(err))
}
}

@ -0,0 +1,11 @@
package misc
type TrafficConReq struct {
ApiName string
StrategyID int
ParamVal string // 用户流量限定中对应的参数值例如限定了mobile则对应的值为15880261185
}
type TrafficConRes struct {
Suc bool
Error string
}

@ -0,0 +1,70 @@
package traffic
import (
"encoding/json"
"fmt"
"time"
"github.com/mafanr/juz/misc"
"github.com/mafanr/g"
"github.com/sunface/talent"
"go.uber.org/zap"
)
func (t *Traffic) loadData() {
lastLoadTime = time.Now()
// 加载所有数据
t.loadAll()
// 定时加载最新的信息
go t.loadUpdated()
}
var lastLoadTime time.Time
func (t *Traffic) loadAll() {
// 加载所有strategy
strategies := make([]*misc.Strategy, 0)
err := g.DB.Select(&strategies, "select * from strategy")
if err != nil {
g.L.Fatal("load strategies error!", zap.Error(err))
}
for _, s := range strategies {
if s.Type == misc.STRATEGY_TRAFFIC {
ts := &misc.TrafficStrategy{}
json.Unmarshal([]byte(s.Content), &ts)
t.Strategies.Store(s.ID, ts)
}
}
}
func (t *Traffic) loadUpdated() {
for {
// 为了防止访问时恰好在更新数据这里给出2秒的容忍值
lastT := talent.Time2String(lastLoadTime.Add(-2 * time.Second))
lastLoadTime = time.Now()
// 加载策略
strategies := make([]*misc.Strategy, 0)
query := fmt.Sprintf("select * from strategy where modify_date >= '%s'", lastT)
err := g.DB.Select(&strategies, query)
if err != nil {
g.L.Error("load strategies error!", zap.Error(err), zap.String("query", query))
return
}
for _, s := range strategies {
if s.Type == misc.STRATEGY_TRAFFIC {
ts := &misc.TrafficStrategy{}
json.Unmarshal([]byte(s.Content), &ts)
t.Strategies.Store(s.ID, ts)
}
}
time.Sleep(10 * time.Second)
}
}

@ -0,0 +1,137 @@
// 对API接口的QPS进行限制
package traffic
import (
"sync"
"time"
"github.com/mafanr/juz/misc"
)
var apiRates = &sync.Map{}
type apiRate struct {
concurrent int
conUpdate time.Time
qps int
qpsUpdate time.Time
params *sync.Map
}
type param struct {
times int
lastTime time.Time
}
// 能调用此函数说明qps、并发至少有一项受到了限制
func (rl *RateLimiter) IncApiRate(req misc.TrafficConReq, reply *misc.TrafficConRes) error {
reply.Suc = true
si, ok := traffic.Strategies.Load(req.StrategyID)
if !ok {
// 不存在策略,就没有限制
return nil
}
s := si.(*misc.TrafficStrategy)
ari, ok := apiRates.Load(req.ApiName)
now := time.Now()
if !ok {
// 之前没有数据,初始化数据
ar := apiRate{1, now, 1, now, &sync.Map{}}
apiRates.Store(req.ApiName, &ar)
return nil
}
ar := ari.(*apiRate)
// QPS限制
if s.QPS != misc.STRATEGY_NO_LIMIT {
//要检查一下时间,如果超过一秒,则重新初始化数据
if now.Sub(ar.qpsUpdate) > 1e9 {
ar.qps = 1
ar.qpsUpdate = now
return nil
}
// 没超过一秒,超出限制,返回失败
if ar.qps >= s.QPS {
reply.Suc = false
return nil
}
// 没超过一秒,每超出,更新数据
ar.qps++
}
// 对并发进行限制
if s.Concurrent != misc.STRATEGY_NO_LIMIT {
// 这里要做一个防BUG设置防止某些情况下并发数到了最大值结果不能减少或者归零
if ar.concurrent >= s.Concurrent {
// 如果当前时间距离上次更新并发时间大于5秒则认为出现了bug重置并发
if now.Sub(ar.conUpdate) > 1e9 {
ar.concurrent = 0
ar.conUpdate = now
return nil
}
// 当前数量已经超过了并发数
reply.Suc = false
return nil
}
ar.concurrent++
ar.conUpdate = now
}
// 用户流量,访问次数限制
if req.ParamVal != "" {
pi, ok := ar.params.Load(req.ParamVal)
if !ok {
// 不存在记录,初始化
ar.params.Store(req.ParamVal, &param{1, now})
return nil
}
// 存在记录
p := pi.(*param)
// 判断当前时间和上次初始化的时间的跨度
if now.Sub(p.lastTime) > time.Duration(s.Span)*1e9 {
// 时间跨度已经超出,重新初始化
p.times = 1
p.lastTime = now
return nil
}
// 还在限制时间范围内,判断请求次数是否超出
if p.times >= s.Times {
reply.Suc = false
return nil
}
p.times++
}
return nil
}
func (rl *RateLimiter) DecApiRate(req misc.TrafficConReq, reply *misc.TrafficConRes) error {
reply.Suc = true
// 获取该api之前的计数
ari, ok := apiRates.Load(req.ApiName)
if !ok {
return nil
}
ar := ari.(*apiRate)
if ar.concurrent == 0 {
return nil
}
if ar.concurrent > 0 && ar.concurrent-1 <= 0 {
ar.concurrent = 0
return nil
}
ar.concurrent--
return nil
}

@ -0,0 +1,71 @@
package traffic
import (
"net"
"net/rpc"
"sync"
"github.com/mafanr/juz/misc"
"github.com/mafanr/g"
"go.uber.org/zap"
)
var traffic *Traffic
type Traffic struct {
Strategies *sync.Map
}
func (t *Traffic) Start() {
// 初始化mysql连接
misc.InitMysql()
// 加载配置数据
t.Strategies = &sync.Map{}
t.loadData()
// 启动rpc服务
t.startRpcServer()
traffic = t
}
func (t *Traffic) Shutdown() {
g.L.Info("shutdown tfe..")
}
type RateLimiter struct{}
// 请不要修改该函数用来测试rpc是否存活
func (rl *RateLimiter) Ping(req int, reply *int) error {
*reply = 1
return nil
}
func (t *Traffic) startRpcServer() {
rl := new(RateLimiter)
server := rpc.NewServer()
err := server.Register(rl)
if err != nil {
g.L.Fatal("register error", zap.Error(err))
}
g.L.Info("Listen tcp on port", zap.String("port", misc.Conf.Traffic.Port))
l, err := net.Listen("tcp", ":"+misc.Conf.Traffic.Port)
if err != nil {
g.L.Fatal("listen error", zap.Error(err))
}
go func() {
for {
conn, err := l.Accept()
if err != nil {
g.L.Error("accept error", zap.Error(err))
continue
}
server.ServeConn(conn)
}
}()
}
Loading…
Cancel
Save