diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 00000000..dc4936a6 Binary files /dev/null and b/.DS_Store differ diff --git a/api/api_server.go b/api/api_server.go new file mode 100644 index 00000000..8bad95a0 --- /dev/null +++ b/api/api_server.go @@ -0,0 +1,125 @@ +package api + +import ( + "net/http" + "net/rpc" + "tfgo/tfe/api/async" + "time" + + "github.com/mafanr/juz/api/filter" + "github.com/mafanr/juz/api/manage" + "github.com/mafanr/juz/api/stats" + "github.com/mafanr/juz/misc" + + "github.com/mafanr/g" + + _ "github.com/go-sql-driver/mysql" + "github.com/labstack/echo" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap" +) + +type ApiServer struct { + manage *manage.Manage + router *router +} + +func (p *ApiServer) Start() { + g.L.Info("start tfe..") + + // 初始化mysql连接 + misc.InitMysql() + + // 从mysql中加载所有的api信息到内存中 + p.loadData() + + p.manage = &manage.Manage{} + go p.manage.Start() + + p.router = &router{p, &filter.Filter{}} + + // 连接到traffic rpc服务 + p.initTraffic() + + // 启动proxy http服务 + go p.listen() + + // 启动metrics收集 + http.Handle("/metrics", promhttp.Handler()) + http.ListenAndServe(":6062", nil) +} + +func (o *ApiServer) Shutdown() { + g.L.Info("shutdown tfe..") +} + +func (p *ApiServer) listen() { + e := echo.New() + + // 回调相关 + //同步回调接口 + e.Any("/*", p.router.route, timing) + //异步回调接口 + e.POST("/notify", async.Notify) + + e.Logger.Fatal(e.Start(":" + misc.Conf.Api.Port)) +} + +func timing(f echo.HandlerFunc) echo.HandlerFunc { + return func(c echo.Context) error { + ts := time.Now() + rid := (ts.UnixNano()/10)*10 + misc.Conf.Api.ServerID + g.L.Info("New request accepted", zap.Int64("rid", rid), zap.String("ip", c.RealIP())) + c.Set("rid", rid) + defer func() { + // 统计请求指标 + apiID := c.Get("api_id").(string) + service := c.Get("service").(string) + label := c.Get("label").(string) + stats.Req.With(prometheus.Labels{ + "api_id": apiID, + "service": service, + "label": label, + }).Observe(float64(time.Now().Sub(ts).Nanoseconds() / 1e6)) + + err := c.Get("error_msg") + if err == nil { + g.L.Info("Request success", zap.Int64("rid", rid)) + } else { + g.L.Info("Request failed", zap.Int64("rid", rid), zap.Error(err.(error))) + } + }() + + return f(c) + } +} + +func (as *ApiServer) initTraffic() { + r, err := rpc.Dial("tcp", misc.Conf.Traffic.Host+":"+misc.Conf.Traffic.Port) + if err != nil { + g.L.Fatal("connect to raffic error", zap.Error(err)) + } + as.router.Filter.Rpc = r + + // 定时检测rpc连接的存活性 + go func() { + for { + var res int + err := as.router.Filter.Rpc.Call("RateLimiter.Ping", 1, &res) + if err != nil || res != 1 { + g.L.Warn("rpc ping failed", zap.Error(err)) + r, err := rpc.Dial("tcp", misc.Conf.Traffic.Host+":"+misc.Conf.Traffic.Port) + if err != nil { + g.L.Warn("re-connect to traffic error", zap.Error(err)) + time.Sleep(2 * time.Second) + continue + } + as.router.Filter.Rpc = r + g.L.Info("re-connect to traffic ok") + } + + time.Sleep(3 * time.Second) + } + }() +} diff --git a/api/api_test.go b/api/api_test.go new file mode 100644 index 00000000..7b859e11 --- /dev/null +++ b/api/api_test.go @@ -0,0 +1,29 @@ +package api + +import ( + "net/http" + "testing" + "time" + + "github.com/mafanr/g" + + "github.com/stretchr/testify/assert" +) + +func TestTfeStartStop(t *testing.T) { + p := &Proxy{} + g.InitConfig("../tfe.conf") + g.InitLogger() + + go func() { + p.Start() + }() + + time.Sleep(2 * time.Second) + + resp, err := http.Get("http://localhost:" + g.Conf.Common.Port + "/service/api?service_id=" + g.TEST_API_NAME) + assert.NoError(t, err) + assert.Equal(t, 200, resp.StatusCode) + + p.Shutdown() +} diff --git a/api/filter/auth.go b/api/filter/auth.go new file mode 100644 index 00000000..b338de0f --- /dev/null +++ b/api/filter/auth.go @@ -0,0 +1,7 @@ +package filter + +/*---------------访问授权和验签-----------------*/ +// AK/SK的方式,达成以下目标 +//1. 鉴别用户是否有访问应用的权限: ak能否对上 +//2. 鉴别访问数据是否完整 md5 +//3. 验证用户签名: RSA, 用sk diff --git a/api/filter/bw_list.go b/api/filter/bw_list.go new file mode 100644 index 00000000..f0ae534b --- /dev/null +++ b/api/filter/bw_list.go @@ -0,0 +1,53 @@ +package filter + +import ( + "fmt" + + "github.com/mafanr/juz/api/req" + "github.com/mafanr/juz/misc" +) + +/* +黑白名单 +*/ + +func (f *Filter) checkBW(r *req.Request) error { + if r.BwStrategy == nil || r.BwStrategy.Type == 0 { + return nil + } + + inList := false + var k, v string + for _, bw := range r.BwStrategy.BwList { + if bw.Type == misc.IP_TYPE { + // IP类型 + if bw.Val == r.ClientIP { + inList = true + k = bw.Key + v = bw.Val + break + } + } else { + // 参数类型 + v, ok := r.Params[bw.Key] + if ok && v == bw.Val { + inList = true + k = bw.Key + v = bw.Val + break + } + } + } + + if r.BwStrategy.Type == misc.BLACK_LIST { + if inList { // 在黑名单上 + return fmt.Errorf("Black list:%s, user %s is blocked", k, v) + } + return nil + } + if inList { // 在白名单上 + return nil + } + return fmt.Errorf("White list: %s, user %s is blocked", k, v) + +} diff --git a/api/filter/filter.go b/api/filter/filter.go new file mode 100644 index 00000000..c43b1009 --- /dev/null +++ b/api/filter/filter.go @@ -0,0 +1,96 @@ +package filter + +import ( + "net/http" + "net/rpc" + + "github.com/mafanr/juz/api/req" + "github.com/mafanr/juz/api/stats" + + "github.com/mafanr/g" + + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +/* +请求拦截过滤插件. +该模块组的核心目的是给网关提供扩展性插件. +我们应该尽量保证网关核心层的不变性,所有的变化的插件都在fileter中实现. +*/ + +// 可以在以下四个地方添加拦截插件 +//1.请求开始前 +//2.接受到服务的返回数据,返回给客户端之前 +//3.请求失败后 + +// 拦截器顺序 +//1.拦截器分为网关自带和用户定义的 +//2.每个拦截器应该具有优先级编号 +//3.按照优先级来调用拦截器 + +type Filter struct { + Rpc *rpc.Client +} + +type Result struct { + Status int + Ecode int + Emsg string +} + +// 收到请求,路由前的hook +func (f *Filter) BeforeRoute(r *req.Request) Result { + // 黑白名单 + err := f.checkBW(r) + if err != nil { + g.L.Info("BeforeRoute failed", zap.Error(err)) + // 统计被阻挡数 + stats.Limits.With(prometheus.Labels{ + "api_id": r.Api.APIID, + "service": r.Api.Service, + "label": r.Api.Label, + }).Inc() + return Result{http.StatusForbidden, g.ForbiddenC, g.ForbiddenE} + } + + // 参数校验 + err = f.verifyParam(r) + if err != nil { + g.L.Info("BeforeRoute failed", zap.Error(err)) + return Result{http.StatusBadRequest, g.ParamInvalidC, g.ParamInvalidE} + } + + // 流量路由 + f.trafficRoute(r) + + // 检查当 + return Result{} +} + +// 调用目标服务前的hook +func (f *Filter) BeforeCall(r *req.Request) Result { + code, err := f.IncApiRate(r) + if err != nil { + // 统计被阻挡数 + stats.Limits.With(prometheus.Labels{ + "api_id": r.Api.APIID, + "service": r.Api.Service, + "label": r.Api.Label, + }).Inc() + return Result{code, g.AccessLimitedC, err.Error()} + } + + return Result{} +} + +// 调用目标服务完成后的hook +func (f *Filter) AfterCall(r *req.Request) Result { + // 并发数减1 + f.DecApiRate(r) + return Result{} +} + +func (f *Filter) RouteFailed(r *req.Request) Result { + return Result{} +} diff --git a/api/filter/param_verify.go b/api/filter/param_verify.go new file mode 100644 index 00000000..090befa8 --- /dev/null +++ b/api/filter/param_verify.go @@ -0,0 +1,32 @@ +package filter + +import ( + "fmt" + "regexp" + + "github.com/mafanr/juz/api/req" + "github.com/mafanr/juz/misc" +) + +/* +对用户请求的参数进行验证 +*/ + +func (f *Filter) verifyParam(r *req.Request) error { + if r.Api.VerifyOn == misc.PARAM_VERIFY_OFF { + return nil + } + + for k, v := range r.Params { + regI, ok := r.Api.ParamRules.Load(k) + if !ok { + continue + } + reg := regI.(*regexp.Regexp) + if !reg.MatchString(v) { // 参数不合法 + return fmt.Errorf("param %s,value %s, verify failed", k, v) + } + } + + return nil +} diff --git a/api/filter/traffic_rate_limiter.go b/api/filter/traffic_rate_limiter.go new file mode 100644 index 00000000..9734c156 --- /dev/null +++ b/api/filter/traffic_rate_limiter.go @@ -0,0 +1,63 @@ +package filter + +import ( + "errors" + "net/http" + "strings" + + "github.com/mafanr/juz/api/req" + "github.com/mafanr/juz/misc" + + "github.com/mafanr/g" + + "go.uber.org/zap" +) + +func (f *Filter) DecApiRate(r *req.Request) { + // 并发控制减1 + if r.Api.TrafficStrategy != 0 { + if r.TrafficStrategy.Concurrent != misc.STRATEGY_NO_LIMIT { + res := &misc.TrafficConRes{} + err := f.Rpc.Call("RateLimiter.DecApiRate", &misc.TrafficConReq{r.Api.APIID, r.Api.TrafficStrategy, ""}, &res) + if err != nil { + if !strings.Contains(err.Error(), "shut down") { + g.L.Warn("rpc出错了", zap.Error(err)) + } + } + } + } + +} + +func (f *Filter) IncApiRate(r *req.Request) (int, error) { + if r.Api.TrafficStrategy != 0 { + // 获取用户流量控制的参数和值 + var val string + param := r.TrafficStrategy.Param + if param != "" { + // 找到该参数对应的值,传给traffic服务,进行限制 + for k, v := range r.Params { + if k == param { + val = v + } + } + } + + if r.TrafficStrategy.QPS != misc.STRATEGY_NO_LIMIT || r.TrafficStrategy.Concurrent != misc.STRATEGY_NO_LIMIT || val != "" { // 至少有一项受到限制 + res := &misc.TrafficConRes{} + err := f.Rpc.Call("RateLimiter.IncApiRate", &misc.TrafficConReq{r.Api.APIID, r.Api.TrafficStrategy, val}, &res) + if err != nil { + if !strings.Contains(err.Error(), "shut down") { + g.L.Warn("rpc出错了", zap.Error(err)) + } + return 0, nil + } + + if !res.Suc { + return http.StatusTooManyRequests, errors.New("当前访问数过多,超过了预设的阀值") + } + } + } + + return 0, nil +} diff --git a/api/filter/traffic_route.go b/api/filter/traffic_route.go new file mode 100644 index 00000000..6d50e88e --- /dev/null +++ b/api/filter/traffic_route.go @@ -0,0 +1,48 @@ +package filter + +import ( + "math/rand" + "strings" + + "github.com/mafanr/juz/api/req" + "github.com/mafanr/juz/misc" + + "github.com/mafanr/g" + + "go.uber.org/zap" +) + +// 检查是否触发traffic route +// 一旦触发,把本次请求的api name换成触发后的结果 +func (f *Filter) trafficRoute(r *req.Request) { + if r.Api.TrafficOn == misc.TRAFFIC_OFF { + // 流量路由未开启,返回 + return + } + + if r.Api.TrafficAPI == "" { + // 路由到的api name为空,返回 + return + } + + apiI, ok := misc.Apis.Load(r.Api.TrafficAPI) + if !ok { + // api不存在,返回 + return + } + api := apiI.(*misc.API) + // 是否在路由ip列表中,如果在,直接路由 + if strings.Contains(r.Api.TrafficIPs, r.ClientIP) { + g.Debug(r.DebugOn, misc.Conf.Common.LogLevel, "Canary by ip", zap.String("old_api", r.Api.APIID), zap.String("new_api", api.APIID), zap.String("client_ip", r.ClientIP)) + r.Api = api + return + } + + // 是否进行流量路由,落到了[0,trafficRatio]的区间,才进行路由 + n := rand.Intn(101) + if n > r.Api.TrafficRatio { + return + } + g.Debug(r.DebugOn, misc.Conf.Common.LogLevel, "Canary by random", zap.String("old_api", r.Api.APIID), zap.String("new_api", api.APIID)) + r.Api = api +} diff --git a/api/load_data.go b/api/load_data.go new file mode 100644 index 00000000..9627c6c0 --- /dev/null +++ b/api/load_data.go @@ -0,0 +1,182 @@ +package api + +import ( + "encoding/json" + "fmt" + "regexp" + "sync" + "time" + + "github.com/mafanr/juz/misc" + + "github.com/mafanr/g" + + "github.com/sunface/talent" + "go.uber.org/zap" +) + +func (p *ApiServer) loadData() { + // 插入一条Test数据 + now := time.Now() + date := talent.Time2StringSecond(time.Now()) + version := talent.Time2Version(now) + g.DB.Exec(fmt.Sprintf("insert into api_release (service,api_id,description,mock_data,route_type,route_addr,create_date) values('admin','admin.test.get.v1','','',1,'http://httpbin.org/get','%s')", date)) + g.DB.Exec(fmt.Sprintf("insert into api_define (service,api_id,description,mock_data,route_type,route_addr,revise_version,release_version,create_date) values('admin','admin.test.get.v1','','',1,'http://httpbin.org/get','%s','%s','%s')", version, version, date)) + + lastLoadTime = time.Now() + // 加载所有数据 + p.loadAll() + + // 定时加载最新的信息 + go p.loadUpdated() +} + +var lastLoadTime time.Time + +func (p *ApiServer) loadAll() { + // 加载所有apis + apisS := make([]*misc.API, 0) + err := g.DB.Select(&apisS, "select * from api_release") + if err != nil { + g.L.Fatal("load apis error!", zap.Error(err)) + } + + an := make([]string, 0, len(apisS)) + for _, api := range apisS { + api.ParamRules = &sync.Map{} + an = append(an, api.APIID) + + if api.ParamTable != nil { + d, _ := g.B64.DecodeString(*api.ParamTable) + // 解析param rules + var prs []*misc.ParamRule + json.Unmarshal(d, &prs) + + for _, pr := range prs { + reg, _ := regexp.Compile(pr.ParamRule) + api.ParamRules.Store(pr.Param, reg) + } + } + + } + + for _, api := range apisS { + misc.Apis.Store(api.APIID, api) + } + + // 加载所有strategy + strategies := make([]*misc.Strategy, 0) + err = g.DB.Select(&strategies, "select * from strategy") + if err != nil { + g.L.Fatal("load strategies error!", zap.Error(err)) + } + + for _, s := range strategies { + // 生成具体的策略内容 + switch s.Type { + case misc.STRATEGY_BWLIST: + t := &misc.BwStrategy{ + Type: s.Type, + BwList: make([]*misc.BW, 0), + } + json.Unmarshal([]byte(s.Content), &t.BwList) + s.DetailContent = t + case misc.STRATEGY_RETRY: + t := &misc.RetryStrategy{} + json.Unmarshal([]byte(s.Content), &t) + s.DetailContent = t + case misc.STRATEGY_TRAFFIC: + t := &misc.TrafficStrategy{} + json.Unmarshal([]byte(s.Content), &t) + s.DetailContent = t + } + + misc.Strategies.Store(s.ID, s) + } +} + +func (p *ApiServer) loadUpdated() { + wg := &sync.WaitGroup{} + for { + wg.Add(2) + // 为了防止访问时,恰好在更新数据,这里给出2秒的容忍值 + lastT := talent.Time2String(lastLoadTime.Add(-2 * time.Second)) + lastLoadTime = time.Now() + + // 加载apis + go func() { + defer wg.Done() + apisS := make([]*misc.API, 0) + + err := g.DB.Select(&apisS, fmt.Sprintf("select * from api_release where modify_date >= '%s'", lastT)) + if err != nil { + g.L.Error("load apis error!", zap.Error(err)) + return + } + + if len(apisS) == 0 { + return + } + + // 加载参数规则 + an := make([]string, 0, len(apisS)) + for _, api := range apisS { + api.ParamRules = &sync.Map{} + an = append(an, api.APIID) + if api.ParamTable != nil { + d, _ := g.B64.DecodeString(*api.ParamTable) + // 解析param rules + var prs []*misc.ParamRule + json.Unmarshal(d, &prs) + + for _, pr := range prs { + reg, _ := regexp.Compile(pr.ParamRule) + api.ParamRules.Store(pr.Param, reg) + } + } + } + + for _, api := range apisS { + misc.Apis.Store(api.APIID, api) + } + }() + + // 加载策略 + go func() { + defer wg.Done() + + strategies := make([]*misc.Strategy, 0) + query := fmt.Sprintf("select * from strategy where modify_date >= '%s'", lastT) + err := g.DB.Select(&strategies, query) + if err != nil { + g.L.Error("load strategies error!", zap.Error(err), zap.String("query", query)) + return + } + + for _, s := range strategies { + // 生成具体的策略内容 + switch s.Type { + case misc.STRATEGY_BWLIST: + t := &misc.BwStrategy{ + Type: s.Type, + BwList: make([]*misc.BW, 0), + } + json.Unmarshal([]byte(s.Content), &t.BwList) + s.DetailContent = t + case misc.STRATEGY_RETRY: + t := &misc.RetryStrategy{} + json.Unmarshal([]byte(s.Content), &t) + s.DetailContent = t + case misc.STRATEGY_TRAFFIC: + t := &misc.TrafficStrategy{} + json.Unmarshal([]byte(s.Content), &t) + s.DetailContent = t + } + misc.Strategies.Store(s.ID, s) + } + }() + + wg.Wait() + time.Sleep(10 * time.Second) + } +} diff --git a/api/load_data_test.go b/api/load_data_test.go new file mode 100644 index 00000000..e89ba9c3 --- /dev/null +++ b/api/load_data_test.go @@ -0,0 +1,37 @@ +package api + +import ( + "sync" + "testing" + + "github.com/mafanr/juz/g" + + "github.com/stretchr/testify/assert" +) + +func TestLoadRow(t *testing.T) { + g.InitConfig("../tfe.conf") + g.InitLogger() + + g.InitMysql() + + p := &Proxy{} + p.apis = &sync.Map{} + p.loadAPIRow(g.TEST_API_NAME) + _, ok := p.apis.Load(g.TEST_API_NAME) + assert.True(t, ok) +} + +func TestLoadAll(t *testing.T) { + g.InitConfig("../tfe.conf") + g.InitLogger() + + g.InitMysql() + + p := &Proxy{} + p.apis = &sync.Map{} + p.loadAllAPIs() + + _, ok := p.apis.Load(g.TEST_API_NAME) + assert.True(t, ok) +} diff --git a/api/manage/api.go b/api/manage/api.go new file mode 100644 index 00000000..b98d400c --- /dev/null +++ b/api/manage/api.go @@ -0,0 +1,834 @@ +package manage + +import ( + "encoding/json" + "fmt" + "net/http" + "regexp" + "strconv" + "strings" + "time" + + "github.com/mafanr/juz/api/manage/audit" + "github.com/mafanr/juz/misc" + + "github.com/mafanr/g" + + "github.com/labstack/echo" + "github.com/sunface/talent" + "go.uber.org/zap" +) + +/*---------------------API相关-----------------------------*/ +func (m *Manage) QueryAPI(c echo.Context) error { + service := talent.FormValue(c, "service") + q := talent.FormValue(c, "q") + pageS := talent.FormValue(c, "page") + if service == "" || pageS == "" { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamEmptyC, + Message: g.ParamEmptyE, + }) + } + + page, _ := strconv.Atoi(pageS) + if page <= 0 { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamInvalidC, + Message: g.ParamInvalidE, + }) + } + role := talent.FormValue(c, "app_priv") + if !m.canView(role) { + return c.JSON(http.StatusForbidden, g.Result{ + Status: http.StatusForbidden, + ErrCode: g.ForbiddenC, + Message: g.ForbiddenE, + }) + } + + apis := make([]*misc.API, 0) + var query string + if q == "" { + query = fmt.Sprintf("select * from api_define where service='%s' order by modify_date desc limit %d offset %d", service, g.PER_PAGE, g.PER_PAGE*(page-1)) + } else { + query = fmt.Sprintf("select * from api_define where service='%s' and ", service) + "api_id like '%" + q + "%' " + fmt.Sprintf(" order by modify_date desc limit %d offset %d", g.PER_PAGE, g.PER_PAGE*(page-1)) + } + + err := g.DB.Select(&apis, query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + for _, api := range apis { + if api.ParamTable != nil { + d, _ := g.B64.DecodeString(*api.ParamTable) + d1 := talent.Bytes2String(d) + api.ParamTable = &d1 + } + } + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + Data: apis, + }) +} + +func (m *Manage) CountAPI(c echo.Context) error { + service := talent.FormValue(c, "service") + q := talent.FormValue(c, "q") + if service == "" { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamEmptyC, + Message: g.ParamEmptyE, + }) + } + + var query string + if q == "" { + query = fmt.Sprintf("select count(1) from api_release where service='%s'", service) + } else { + query = fmt.Sprintf("select count(1) from api_release where service='%s' and api_id like '%%", service) + q + "%'" + } + + rows, err := g.DB.Query(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + var total int + rows.Next() + rows.Scan(&total) + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + Data: total, + }) +} + +func (m *Manage) DefineAPI(c echo.Context) error { + api, ecode, emsg := m.parseAPI(c) + if ecode != 0 { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: ecode, + Message: emsg, + }) + } + + //必须是该service的管理员或者创建者 + if !m.canOperate(talent.FormValue(c, "app_priv")) { + return c.JSON(http.StatusForbidden, g.Result{ + Status: http.StatusForbidden, + ErrCode: g.ForbiddenC, + Message: g.ForbiddenE, + }) + } + + action := talent.FormValue(c, "action") + now := time.Now() + date := talent.Time2StringSecond(now) + + pr := g.B64.EncodeToString(talent.String2Bytes(*api.ParamTable)) + if action == "create" { + query := fmt.Sprintf(`insert into api_define (api_id,path_type,service,description,route_type,route_addr,route_proto,bw_strategy,retry_strategy,traffic_strategy,mock_data,traffic_on,traffic_api,traffic_ratio,traffic_ips,verify_on,param_rules,cached_time,revise_version,create_date,label) + values ('%s','%d','%s','%s','%d','%s','%d','%d','%d','%d','%s','%d','%s','%d','%s','%d','%s','%d','%s', '%s','%s')`, + api.APIID, api.PathType, api.Service, *api.Desc, api.RouteType, api.RouteAddr, api.RouteProto, api.BwStrategy, api.RetryStrategy, api.TrafficStrategy, *api.MockData, api.TrafficOn, api.TrafficAPI, api.TrafficRatio, api.TrafficIPs, api.VerifyOn, pr, api.CachedTime, talent.Time2Version(now), date, api.Label) + _, err := g.DB.Exec(query) + if err != nil { + if strings.Contains(err.Error(), g.DUP_KEY_ERR) { + return c.JSON(http.StatusConflict, g.Result{ + Status: http.StatusConflict, + ErrCode: g.AlreadyExistC, + Message: g.AlreadyExistE, + }) + } + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + // 初始化api发布表,并设置为未发布状态 + // 这里把不能再修改的值进行初始化 + query = fmt.Sprintf(`insert into api_release (api_id,path_type,service,route_addr,status,create_date) values ('%s','%d','%s','%s','%d','%s')`, + api.APIID, api.PathType, api.Service, api.RouteAddr, misc.API_OFFLINE, date) + _, err = g.DB.Exec(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + audit.Log(c.FormValue("username"), api.Service, audit.TypeApi, api.APIID, audit.OpCreate, c.FormValue("api"), "") + } else { + query := fmt.Sprintf("update api_define set description='%s',route_type='%d',route_addr='%s',route_proto='%d',bw_strategy='%d',retry_strategy='%d',traffic_strategy='%d',mock_data='%s',traffic_on='%d',traffic_api='%s',traffic_ratio='%d',traffic_ips='%s',verify_on='%d',param_rules='%s',cached_time='%d',label='%s' where api_id='%s'", + *api.Desc, api.RouteType, api.RouteAddr, api.RouteProto, api.BwStrategy, api.RetryStrategy, api.TrafficStrategy, *api.MockData, api.TrafficOn, api.TrafficAPI, api.TrafficRatio, api.TrafficIPs, api.VerifyOn, pr, api.CachedTime, api.Label, api.APIID) + res, err := g.DB.Exec(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + n, _ := res.RowsAffected() + if n == 0 { + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + ErrCode: g.NoUpdateHappenedC, + Message: g.NoUpdateHappenedE, + }) + } + + query = fmt.Sprintf("update api_define set revise_version='%s' where api_id='%s'", talent.Time2Version(now), api.APIID) + g.DB.Exec(query) + audit.Log(c.FormValue("username"), api.Service, audit.TypeApi, api.APIID, audit.OpEdit, c.FormValue("api"), "") + } + + // 查询并返回最新的api + api1 := misc.API{} + g.DB.Get(&api1, fmt.Sprintf("select * from api_define where api_id='%s'", api.APIID)) + + if api1.ParamTable != nil { + d, _ := g.B64.DecodeString(*api1.ParamTable) + d1 := talent.Bytes2String(d) + api1.ParamTable = &d1 + } + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + Data: api1, + }) +} + +// API下线10分钟后,方可删除 +func (m *Manage) DeleteAPI(c echo.Context) error { + service := talent.FormValue(c, "service") + apiID := talent.FormValue(c, "api_id") + userID := talent.FormValue(c, "username") + if apiID == "" || userID == "" || service == "" { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamEmptyC, + Message: g.ParamEmptyE, + }) + } + + //必须是该service的管理员或者创建者 + if !m.canOperate(talent.FormValue(c, "app_priv")) { + return c.JSON(http.StatusForbidden, g.Result{ + Status: http.StatusForbidden, + ErrCode: g.ForbiddenC, + Message: g.ForbiddenE, + }) + } + + // 查询API的发布状态,只有下线10分钟后,才能删除 + query := fmt.Sprintf("select status,modify_date from api_release where api_id='%s'", apiID) + rows, err := g.DB.Query(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + if !rows.Next() { + return c.JSON(http.StatusNotFound, g.Result{ + Status: http.StatusNotFound, + ErrCode: APIOfflineC, + Message: APIOfflineE, + }) + } + var status int + var ud string + rows.Scan(&status, &ud) + // 已经发布的不能删除 + if status == misc.API_RELEASED { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: ApiStillReleasedC, + Message: ApiStillReleasedE, + }) + } + + // 下线不到30秒,不能删除 + t, _ := talent.StringToTime(ud) + if time.Now().Sub(t) < 30*time.Second { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: ApiInactiveNotLongEnoughC, + Message: ApiInactiveNotLongEnoughE, + }) + } + + // 将api发布表的状态设置为已删除 + query = fmt.Sprintf("delete from api_release where api_id='%s'", apiID) + _, err = g.DB.Exec(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + // 查询api定义,为了记录审计日志 + api := misc.API{} + query = fmt.Sprintf("select * from api_define where api_id='%s'", apiID) + err = g.DB.Get(&api, query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + } + if api.ParamTable != nil { + d, _ := g.B64.DecodeString(*api.ParamTable) + d1 := talent.Bytes2String(d) + api.ParamTable = &d1 + } + + b, _ := json.Marshal(api) + + // 从api定义表删除 + query = fmt.Sprintf("delete from api_define where api_id='%s'", apiID) + _, err = g.DB.Exec(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + audit.Log(userID, service, audit.TypeApi, apiID, audit.OpDelete, talent.Bytes2String(b), "") + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + }) +} + +func (m *Manage) parseAPI(c echo.Context) (*misc.API, int, string) { + apiR := c.FormValue("api") + + api := &misc.API{} + err := json.Unmarshal([]byte(apiR), &api) + if err != nil { + g.L.Info("parse api", zap.Error(err), zap.String("api", string(apiR))) + return nil, g.ParamInvalidC, g.ParamInvalidE + } + + if api.Service == "" { + return nil, ServiceEmptyC, ServiceEmptyE + } + + if api.PathType != misc.OFF && api.PathType != misc.ON { + return nil, ApiPathTypeInvalidC, ApiPathTypeInvalidE + } + + if api.PathType == misc.OFF { + // 非路径格式,api名组成形式 + if !talent.OnlyAlphaNumAndDot(api.APIID) { + return nil, ApiOnlyAlphaNumAndDotC, ApiOnlyAlphaNumAndDotE + } + // 检查api id的前缀 + if !strings.HasPrefix(api.APIID, api.Service+".") { + return nil, ApiWithServicePrefixC, ApiWithServicePrefixE + } + } else { + // @todo,检查路径形式的参数是否合法,/a/b/c + //路径不能为保留的 + if !talent.OnlyAlphaNumAndUri(api.APIID) { + return nil, ApiOnlyAlphaNumAndUriC, ApiOnlyAlphaNumAndUriE + } + + name := api.APIID[:len(api.APIID)-3] + if name == "/service/api" || name == "/notify" { + return nil, ApiReservePathC, ApiReservePathE + } + } + + // 检查api id的后缀 + if api.APIID[len(api.APIID)-2] != 'v' || api.APIID[len(api.APIID)-3] != '.' { + return nil, ApiWithServiceSuffixC, ApiWithServiceSuffixE + } + + if api.RouteAddr == "" || strings.TrimSpace(api.RouteAddr) == "http://" || strings.TrimSpace(api.RouteAddr) == "https://" { + return nil, RouteAddrEmptyC, RouteAddrEmptyE + } + + if api.RouteAddr != "" { + if !strings.HasPrefix(api.RouteAddr, "http") { + return nil, RouteAddrWithHTTPPrefixC, RouteAddrWithHTTPPrefixE + } + } + + if (api.RouteProto != 1) && (api.RouteProto != 2) { + return nil, RouteProtoInvalidC, RouteProtoInvalidE + } + + if api.TrafficAPI != "" { + _, ok := misc.Apis.Load(api.TrafficAPI) + if !ok { + return nil, ApiNotExistC, ApiNotExistE + } + } + + if api.TrafficRatio < 0 || api.TrafficRatio > 100 { + return nil, TrafficRatioInvalidC, TrafficRatioInvalidE + } + + if (api.TrafficOn != 0) && (api.TrafficOn != 1) { + api.TrafficOn = 0 + } + + if (api.VerifyOn != 0) && (api.VerifyOn != 1) { + api.VerifyOn = 0 + } + + if (api.CachedTime < 0) || (api.CachedTime > 30) { + api.CachedTime = 0 + } + + return api, 0, "" +} + +func (m *Manage) APIRelease(c echo.Context) error { + apiID := c.FormValue("api_id") + userID := c.FormValue("username") + + if apiID == "" { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamEmptyC, + Message: g.ParamEmptyE, + }) + } + + //必须是该service的管理员或者创建者 + if !m.canOperate(talent.FormValue(c, "app_priv")) { + return c.JSON(http.StatusForbidden, g.Result{ + Status: http.StatusForbidden, + ErrCode: g.ForbiddenC, + Message: g.ForbiddenE, + }) + } + + // 查询最新的api定义 + api := misc.API{} + query := fmt.Sprintf("select * from api_define where api_id='%s'", apiID) + err := g.DB.Get(&api, query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + // 更新release + query = fmt.Sprintf("update api_release set description='%s',route_type='%d',route_addr='%s',route_proto='%d',mock_data='%s',retry_strategy='%d',bw_strategy='%d',traffic_strategy='%d',traffic_on='%d',traffic_api='%s',traffic_ratio='%d',traffic_ips='%s',verify_on='%d',param_rules='%s', cached_time='%d',status='%d',label='%s' where api_id='%s'", + *api.Desc, api.RouteType, api.RouteAddr, api.RouteProto, *api.MockData, api.RetryStrategy, api.BwStrategy, api.TrafficStrategy, api.TrafficOn, api.TrafficAPI, api.TrafficRatio, api.TrafficIPs, api.VerifyOn, *api.ParamTable, api.CachedTime, misc.API_RELEASED, api.Label, api.APIID) + + _, err = g.DB.Exec(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + query = fmt.Sprintf("update api_define set release_version='%s' where api_id='%s'", api.ReviseVersion, api.APIID) + _, err = g.DB.Exec(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + if api.ParamTable != nil { + d, _ := g.B64.DecodeString(*api.ParamTable) + d1 := talent.Bytes2String(d) + api.ParamTable = &d1 + } + b, _ := json.Marshal(api) + audit.Log(userID, api.Service, audit.TypeApi, api.APIID, audit.OpRelease, talent.Bytes2String(b), "") + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + }) +} + +func (m *Manage) APIOffline(c echo.Context) error { + apiID := c.FormValue("api_id") + userID := c.FormValue("username") + + if apiID == "" { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamEmptyC, + Message: g.ParamEmptyE, + }) + } + + //必须是该service的管理员或者创建者 + if !m.canOperate(talent.FormValue(c, "app_priv")) { + return c.JSON(http.StatusForbidden, g.Result{ + Status: http.StatusForbidden, + ErrCode: g.ForbiddenC, + Message: g.ForbiddenE, + }) + } + + query := fmt.Sprintf("update api_release set status='%d' where api_id='%s'", + misc.API_OFFLINE, apiID) + _, err := g.DB.Exec(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + // 更新api_define表的发布时间 + query = fmt.Sprintf("update api_define set release_version='%s' where api_id='%s'", "", apiID) + _, err = g.DB.Exec(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + // 查询已发布的api定义 + api := misc.API{} + query = fmt.Sprintf("select * from api_release where api_id='%s'", apiID) + err = g.DB.Get(&api, query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + } + if api.ParamTable != nil { + d, _ := g.B64.DecodeString(*api.ParamTable) + d1 := talent.Bytes2String(d) + api.ParamTable = &d1 + } + + b, _ := json.Marshal(api) + audit.Log(userID, api.Service, audit.TypeApi, api.APIID, audit.OpOffline, talent.Bytes2String(b), "") + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + }) +} + +// 请求参数验证 +func (m *Manage) VerifyParamRule(c echo.Context) error { + param := strings.TrimSpace(c.FormValue("param")) + rule := strings.TrimSpace(c.FormValue("rule")) + testData := strings.TrimSpace(c.FormValue("test_data")) + if param == "" || rule == "" || testData == "" { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamEmptyC, + Message: g.ParamEmptyE, + }) + } + + // 验证正则是否合法 + r, err := regexp.Compile(rule) + if err != nil { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: InvalidVerifyTableC, + Message: InvalidVerifyTableE, + }) + } + // 验证测试数据跟正则是否匹配 + if !r.Match([]byte(testData)) { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: InvalidVerifyRuleC, + Message: InvalidVerifyRuleE, + }) + } + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + }) +} + +//批量设置策略 +func (m *Manage) APIBatchStrategy(c echo.Context) error { + apiIDS := strings.TrimSpace(c.FormValue("api_ids")) + bw := strings.TrimSpace(c.FormValue("batch_bw")) + retry := strings.TrimSpace(c.FormValue("batch_retry")) + traffic := strings.TrimSpace(c.FormValue("batch_traffic")) + service := c.FormValue("service") + if apiIDS == "" || (bw == "" && retry == "" && traffic == "") || service == "" { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamEmptyC, + Message: g.ParamEmptyE, + }) + } + + var apiIds []string + err := json.Unmarshal([]byte(apiIDS), &apiIds) + if err != nil { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamInvalidC, + Message: g.ParamInvalidE, + }) + } + + //必须是该service的管理员或者创建者 + if !m.canOperate(talent.FormValue(c, "app_priv")) { + return c.JSON(http.StatusForbidden, g.Result{ + Status: http.StatusForbidden, + ErrCode: g.ForbiddenC, + Message: g.ForbiddenE, + }) + } + + now := time.Now() + for _, apiID := range apiIds { + var query = "update api_define set" + if bw != "" { + query = fmt.Sprintf("%s bw_strategy='%s', ", query, bw) + } + if retry != "" { + query = fmt.Sprintf("%s retry_strategy='%s', ", query, retry) + } + if traffic != "" { + query = fmt.Sprintf("%s traffic_strategy='%s'", query, traffic) + } + + query = fmt.Sprintf("%s where api_id = '%s'", query, apiID) + res, err := g.DB.Exec(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + n, _ := res.RowsAffected() + if n == 0 { + continue + } + // 更新版本号 + query = fmt.Sprintf("update api_define set revise_version='%s' where api_id='%s'", talent.Time2Version(now), apiID) + g.DB.Exec(query) + } + + userID := c.FormValue("username") + audit.Log(userID, service, audit.TypeBatch, fmt.Sprintf("bw: %s,retry: %s", bw, retry), audit.OpCreate, apiIDS, "批量添加策略") + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + }) +} + +// 批量删除策略 +func (m *Manage) APIBatchDelStrategy(c echo.Context) error { + apiIDS := strings.TrimSpace(c.FormValue("api_ids")) + service := c.FormValue("service") + if apiIDS == "" { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamEmptyC, + Message: g.ParamEmptyE, + }) + } + + var apiIds []string + err := json.Unmarshal([]byte(apiIDS), &apiIds) + if err != nil { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamInvalidC, + Message: g.ParamInvalidE, + }) + } + + //必须是该service的管理员或者创建者 + if !m.canOperate(talent.FormValue(c, "app_priv")) { + return c.JSON(http.StatusForbidden, g.Result{ + Status: http.StatusForbidden, + ErrCode: g.ForbiddenC, + Message: g.ForbiddenE, + }) + } + + tp, _ := strconv.Atoi(c.FormValue("type")) + + now := time.Now() + for _, apiID := range apiIds { + var query string + switch tp { + case misc.STRATEGY_BWLIST: + query = fmt.Sprintf("update api_define set bw_strategy='%d' where api_id = '%s'", misc.STRATEGY_EMPTY, apiID) + case misc.STRATEGY_RETRY: + query = fmt.Sprintf("update api_define set retry_strategy='%d' where api_id = '%s'", misc.STRATEGY_EMPTY, apiID) + case misc.STRATEGY_TRAFFIC: + query = fmt.Sprintf("update api_define set traffic_strategy='%d' where api_id = '%s'", misc.STRATEGY_EMPTY, apiID) + default: + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamInvalidC, + Message: g.ParamInvalidE, + }) + } + + res, err := g.DB.Exec(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + n, _ := res.RowsAffected() + if n == 0 { + continue + } + // 更新版本号 + query = fmt.Sprintf("update api_define set revise_version='%s' where api_id='%s'", talent.Time2Version(now), apiID) + g.DB.Exec(query) + } + + userID := c.FormValue("username") + var msg string + switch tp { + case misc.STRATEGY_BWLIST: + msg = "White/Black List" + case misc.STRATEGY_RETRY: + msg = "Timeout/Retry" + case misc.STRATEGY_TRAFFIC: + msg = "Traffic Control" + } + audit.Log(userID, service, audit.TypeBatch, msg, audit.OpDelete, apiIDS, "Batch delete strategy") + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + }) +} + +func (m *Manage) APIBatchRelease(c echo.Context) error { + apiIDS := strings.TrimSpace(c.FormValue("api_ids")) + if apiIDS == "" { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamEmptyC, + Message: g.ParamEmptyE, + }) + } + + var apiIds []string + err := json.Unmarshal([]byte(apiIDS), &apiIds) + if err != nil { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamInvalidC, + Message: g.ParamInvalidE, + }) + } + + //必须是该service的管理员或者创建者 + if !m.canOperate(talent.FormValue(c, "app_priv")) { + return c.JSON(http.StatusForbidden, g.Result{ + Status: http.StatusForbidden, + ErrCode: g.ForbiddenC, + Message: g.ForbiddenE, + }) + } + + for _, apiID := range apiIds { + // 查询最新的api定义 + api := misc.API{} + query := fmt.Sprintf("select * from api_define where api_id='%s'", apiID) + err := g.DB.Get(&api, query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + // 更新release + query = fmt.Sprintf("update api_release set description='%s',route_type='%d',route_addr='%s',route_proto='%d',mock_data='%s',retry_strategy='%d',bw_strategy='%d',traffic_on='%d',traffic_api='%s',traffic_ratio='%d',traffic_ips='%s',verify_on='%d',param_rules='%s', cached_time='%d',status='%d',label='%s' where api_id='%s'", + *api.Desc, api.RouteType, api.RouteAddr, api.RouteProto, *api.MockData, api.RetryStrategy, api.BwStrategy, api.TrafficOn, api.TrafficAPI, api.TrafficRatio, api.TrafficIPs, api.VerifyOn, *api.ParamTable, api.CachedTime, misc.API_RELEASED, api.Label, api.APIID) + + _, err = g.DB.Exec(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + query = fmt.Sprintf("update api_define set release_version='%s' where api_id='%s'", api.ReviseVersion, api.APIID) + _, err = g.DB.Exec(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + userID := c.FormValue("username") + if api.ParamTable != nil { + d, _ := g.B64.DecodeString(*api.ParamTable) + d1 := talent.Bytes2String(d) + api.ParamTable = &d1 + } + b, _ := json.Marshal(api) + audit.Log(userID, api.Service, audit.TypeApi, api.APIID, audit.OpRelease, talent.Bytes2String(b), "") + } + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + }) +} diff --git a/api/manage/audit/audit_log.go b/api/manage/audit/audit_log.go new file mode 100644 index 00000000..f27f60eb --- /dev/null +++ b/api/manage/audit/audit_log.go @@ -0,0 +1,136 @@ +package audit + +import ( + "fmt" + "net/http" + "strconv" + + "github.com/mafanr/g" + + "github.com/sunface/talent" + + "github.com/labstack/echo" + "go.uber.org/zap" +) + +const ( + TypeService = 1 + TypeApi = 2 + TypeStrategy = 3 + TypePrivilegy = 4 + TypeBatch = 5 + + OpCreate = 1 + OpEdit = 2 + OpRelease = 3 + OpOffline = 4 + OpDelete = 5 +) + +func Log(userID string, service string, targetType int, targetID string, opType int, content string, desc string) { + newc := g.B64.EncodeToString(talent.String2Bytes(content)) + query := fmt.Sprintf("insert into audit_log (user_id,service,target_type,target_id,op_type,content,description) values ('%s','%s','%d','%s','%d','%s','%s')", + userID, service, targetType, targetID, opType, newc, desc) + _, err := g.DB.Exec(query) + if err != nil { + g.L.Info("record audit log error", zap.Error(err), zap.String("query", query)) + } +} + +func Count(c echo.Context) error { + tt := c.FormValue("target_type") + tid := c.FormValue("target_id") + if tt == "" || tid == "" { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamEmptyC, + Message: g.ParamEmptyE, + }) + } + + var query string + if tt == "0" { + query = fmt.Sprintf("select count(1) from audit_log where service in (%s)", tid) + } else { + query = fmt.Sprintf("select count(1) from audit_log where target_id='%s' and target_type='%s'", tid, tt) + } + rows, err := g.DB.Query(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + var total int + rows.Next() + rows.Scan(&total) + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + Data: total, + }) +} + +type AuditLog struct { + ID int `db:"id" json:"-"` + UserID string `db:"user_id" json:"user_id"` + Service string `db:"service" json:"service"` + TargetType string `db:"target_type" json:"target_type"` + TargetID string `db:"target_id" json:"target_id"` + OpType string `db:"op_type" json:"op_type"` + Content string `db:"content" json:"content"` + Desc string `db:"description" json:"desc"` + ModifyDate string `db:"modify_date" json:"modify_date"` +} + +func Load(c echo.Context) error { + tt := c.FormValue("target_type") + tid := c.FormValue("target_id") + pageS := c.FormValue("page") + + if tt == "" || tid == "" || pageS == "" { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamEmptyC, + Message: g.ParamEmptyE, + }) + } + page, _ := strconv.Atoi(pageS) + if page <= 0 { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamInvalidC, + Message: g.ParamInvalidE, + }) + } + + rs := make([]AuditLog, 0) + var query string + if tt == "0" { + query = fmt.Sprintf("select * from audit_log where service in (%s) order by modify_date desc limit %d offset %d", tid, g.PER_PAGE, g.PER_PAGE*(page-1)) + } else { + query = fmt.Sprintf("select * from audit_log where target_id='%s' and target_type='%s' order by modify_date desc limit %d offset %d", tid, tt, g.PER_PAGE, g.PER_PAGE*(page-1)) + } + err := g.DB.Select(&rs, query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + for i, l := range rs { + b, _ := g.B64.DecodeString(l.Content) + rs[i].Content = talent.Bytes2String(b) + } + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + Data: rs, + }) +} diff --git a/api/manage/ecode.go b/api/manage/ecode.go new file mode 100644 index 00000000..82c64bd7 --- /dev/null +++ b/api/manage/ecode.go @@ -0,0 +1,68 @@ +package manage + +const ( + CodeInvalidUser = 10001 + ErrInvalidUser = "Username invalid" + + InvalidVerifyTableC = 10002 + InvalidVerifyTableE = "Param verify rule is not a legal regexp" + + InvalidVerifyRuleC = 10003 + InvalidVerifyRuleE = "The test data is not satisfying the verify rule" + + ApiWithServicePrefixC = 10004 + ApiWithServicePrefixE = "API name must be prefix with service name" + ApiWithServiceSuffixC = 10014 + ApiWithServiceSuffixE = "API ID must suffix with version" + + ServiceEmptyC = 10005 + ServiceEmptyE = "Service name cant be empty" + + ApiOnlyAlphaNumAndDotC = 10006 + ApiOnlyAlphaNumAndDotE = "API name can only be consisted of alphabet and numberic" + + RouteAddrWithHTTPPrefixC = 10007 + RouteAddrWithHTTPPrefixE = "Backend url must prefix with http:// or https://" + + RouteAddrEmptyC = 10008 + RouteAddrEmptyE = "Backend url cant be empty" + + RouteProtoInvalidC = 10009 + RouteProtoInvalidE = "Backend type invalid" + + ReqTimeoutInvalidC = 10010 + ReqTimeoutInvalidE = "Timeout must be in (0,60]" + + RetryTimesInvalidC = 10011 + RetryTimesInvalidE = "Retry times must be in (0,5]" + + RetryIntvInvalidC = 10012 + RetryIntvInvalidE = "Retry interval must be in (0,30]" + + TrafficRatioInvalidC = 10013 + TrafficRatioInvalidE = "Traffic ratio must be in [0,100]" + + ApiPathTypeInvalidC = 10014 + ApiPathTypeInvalidE = "API URL type must be 0 or 1" + + ApiNotExistE = "Api not exist" + ApiNotExistC = 10015 + + ApiOnlyAlphaNumAndUriC = 10016 + ApiOnlyAlphaNumAndUriE = "API name can only be consisted of alphabet,numberic and /" + + ApiReservePathC = 10017 + ApiReservePathE = "You cant use the reserverd name" + + APIOfflineE = "API not released" + APIOfflineC = 1058 + + ApiStillReleasedE = "API still being released" + ApiStillReleasedC = 1059 + + ApiInactiveNotLongEnoughE = "You cant delete api until 30 seconds after offline" + ApiInactiveNotLongEnoughC = 1060 + + StrategyNameExistE = "Strategy name already exist" + StrategyNameExistc = 1061 +) diff --git a/api/manage/labels.go b/api/manage/labels.go new file mode 100644 index 00000000..87c2942c --- /dev/null +++ b/api/manage/labels.go @@ -0,0 +1,80 @@ +package manage + +import ( + "fmt" + "net/http" + "strings" + + "github.com/mafanr/g" + + "github.com/labstack/echo" + "go.uber.org/zap" +) + +func (m *Manage) QueryLabels(c echo.Context) error { + service := c.FormValue("service") + if service == "" { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamEmptyC, + Message: g.ParamEmptyE, + }) + } + + query := fmt.Sprintf("select name from labels where service='%s'", service) + rows, err := g.DB.Query(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + labels := make([]string, 0) + for rows.Next() { + var l string + rows.Scan(&l) + labels = append(labels, l) + } + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + Data: labels, + }) +} + +func (m *Manage) CreateLabel(c echo.Context) error { + service := c.FormValue("service") + name := c.FormValue("name") + if service == "" || name == "" { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamEmptyC, + Message: g.ParamEmptyE, + }) + } + + query := fmt.Sprintf("insert into labels (service,name) values ('%s','%s')", service, name) + _, err := g.DB.Exec(query) + if err != nil { + if strings.Contains(err.Error(), g.DUP_KEY_ERR) { + return c.JSON(http.StatusConflict, g.Result{ + Status: http.StatusConflict, + ErrCode: g.AlreadyExistC, + Message: g.AlreadyExistE, + }) + } + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + }) +} diff --git a/api/manage/manage.go b/api/manage/manage.go new file mode 100644 index 00000000..b3968972 --- /dev/null +++ b/api/manage/manage.go @@ -0,0 +1,149 @@ +package manage + +import ( + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "github.com/mafanr/juz/api/manage/audit" + "github.com/mafanr/juz/api/manage/strategy" + "github.com/mafanr/juz/misc" + + "github.com/mafanr/g" + + "go.uber.org/zap" + + "github.com/labstack/echo" + "github.com/sunface/talent" +) + +type Manage struct{} + +func (m *Manage) Start() { + registerToEtcd() + + e := echo.New() + //api管理 + e.POST("/manage/api/query", m.QueryAPI, auth) + e.POST("/manage/api/count", m.CountAPI, auth) + e.POST("/manage/api/define", m.DefineAPI, auth) + e.POST("/manage/api/delete", m.DeleteAPI, auth) + e.POST("/manage/api/verifyParamRule", m.VerifyParamRule, auth) + + e.POST("/manage/api/release", m.APIRelease, auth) + e.POST("/manage/api/batchRelease", m.APIBatchRelease, auth) + e.POST("/manage/api/offline", m.APIOffline, auth) + + e.POST("/manage/api/batchStrategy", m.APIBatchStrategy, auth) + e.POST("/manage/api/batchDelStrategy", m.APIBatchDelStrategy, auth) + + //策略管理 + e.POST("/manage/strategy/create", strategy.Create, auth) + e.POST("/manage/strategy/update", strategy.Update, auth) + e.POST("/manage/strategy/load", strategy.Load, auth) + e.POST("/manage/strategy/change", strategy.Change, auth) + + e.POST("/manage/strategy/delete", strategy.Delete, auth) + e.POST("/manage/strategy/query", strategy.Query, auth) + // e.POST("/manage/strategy/api", strategy.Api, auth) + + // 审计日志 + e.POST("/manage/auditLog/count", audit.Count, auth) + e.POST("/manage/auditLog/load", audit.Load, auth) + + // 标签分组 + e.POST("/manage/labels/query", m.QueryLabels, auth) + e.POST("/manage/labels/create", m.CreateLabel, auth) + + e.Logger.Fatal(e.Start(":" + misc.Conf.Manage.Port)) +} + +func auth(f echo.HandlerFunc) echo.HandlerFunc { + return func(c echo.Context) error { + if c.FormValue("admin_token") != misc.Conf.Common.AdminToken { + return c.JSON(http.StatusUnauthorized, g.Result{ + Status: http.StatusUnauthorized, + ErrCode: g.ForbiddenC, + Message: g.ForbiddenE, + }) + } + return f(c) + } +} + +func registerToEtcd() { + g.EtcdCli = g.InitEtcd(misc.Conf.Etcd.Addrs) + + // 保存服务状态到etcd + ip := talent.LocalIP() + fmt.Println("local ip:", ip) + + host := ip + ":" + misc.Conf.Manage.Port + go func() { + for { + err := g.StoreServer(g.EtcdCli, &g.ServerInfo{g.TFEManage, host, 0}) + if err != nil { + g.L.Error("Store to etcd error", zap.Error(err)) + } + + time.Sleep(time.Second * g.ServiceStoreInterval) + } + }() +} + +func validUserID(s string) bool { + i, err := strconv.Atoi(s) + if err != nil { + return false + } + if i == 0 { + return false + } + return true +} + +func (m *Manage) serviceExist(service string) bool { + // 验证service是否存在 + var temp interface{} + query := fmt.Sprintf("select id from service where name ='%s'", service) + err := g.DB.Get(&temp, query) + if err != nil { + return false + } + + return true +} + +func (m *Manage) canView(priv string) bool { + if priv == g.PRIV_GUEST { + return false + } + return true +} + +func (m *Manage) canOperate(priv string) bool { + if priv == g.PRIV_ADMIN || priv == g.PRIV_OWNER { + return true + } + + return false +} + +func isServiceCreator(userID string, service string) bool { + // 验证是否是service创建者 + var temp interface{} + query := fmt.Sprintf("select id from service where name ='%s' and creator='%s'", service, userID) + err := g.DB.Get(&temp, query) + if err == nil { + // 是创建者 + return true + } + + return false +} + +func getServiceByApiName(apiName string) string { + return strings.Split(apiName, ".")[0] +} diff --git a/api/manage/strategy/strategy.go b/api/manage/strategy/strategy.go new file mode 100644 index 00000000..f1362de0 --- /dev/null +++ b/api/manage/strategy/strategy.go @@ -0,0 +1,340 @@ +package strategy + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "github.com/mafanr/juz/api/manage/audit" + "github.com/mafanr/juz/misc" + + "github.com/mafanr/g" + + "github.com/labstack/echo" + "github.com/sunface/talent" + "go.uber.org/zap" +) + +func parse(c echo.Context) (*misc.Strategy, int, string) { + str := c.FormValue("strategy") + if str == "" { + return nil, g.ParamEmptyC, g.ParamEmptyE + } + + st := &misc.Strategy{} + err := json.Unmarshal([]byte(str), &st) + if err != nil { + return nil, g.ParamInvalidC, g.ParamInvalidE + } + + if st.Name == "" || st.Service == "" { + return nil, g.ParamInvalidC, g.ParamInvalidE + } + return st, 0, "" +} + +func Create(c echo.Context) error { + st, ecode, emsg := parse(c) + if ecode != 0 { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: ecode, + Message: emsg, + }) + } + + if !canOperate(c, st.Service) { + return c.JSON(http.StatusForbidden, g.Result{ + Status: http.StatusForbidden, + ErrCode: g.ForbiddenC, + Message: g.ForbiddenE, + }) + } + + query := fmt.Sprintf("insert into strategy (name,service,type,sub_type,content,create_date) values ('%s','%s','%d','%d','%s','%s')", + st.Name, st.Service, st.Type, st.SubType, st.Content, talent.Time2StringSecond(time.Now())) + res, err := g.DB.Exec(query) + if err != nil { + if strings.Contains(err.Error(), g.DUP_KEY_ERR) { + return c.JSON(http.StatusConflict, g.Result{ + Status: http.StatusConflict, + ErrCode: g.AlreadyExistC, + Message: "Strategy name already exist", + }) + } + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + id, _ := res.LastInsertId() + audit.Log(c.FormValue("username"), st.Service, audit.TypeStrategy, fmt.Sprintf("%d:%s", id, st.Name), audit.OpCreate, c.FormValue("strategy"), "") + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + }) +} + +func Update(c echo.Context) error { + st, ecode, emsg := parse(c) + if ecode != 0 { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: ecode, + Message: emsg, + }) + } + + if !canOperate(c, st.Service) { + return c.JSON(http.StatusForbidden, g.Result{ + Status: http.StatusForbidden, + ErrCode: g.ForbiddenC, + Message: g.ForbiddenE, + }) + } + + query := fmt.Sprintf("update strategy set name='%s',sub_type='%d',content='%s' where id ='%d'", + st.Name, st.SubType, st.Content, st.ID) + _, err := g.DB.Exec(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + audit.Log(c.FormValue("username"), st.Service, audit.TypeStrategy, fmt.Sprintf("%d:%s", st.ID, st.Name), audit.OpEdit, c.FormValue("strategy"), "") + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + }) +} + +func Load(c echo.Context) error { + service := c.FormValue("service") + tp := c.FormValue("type") + if service == "" { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamEmptyC, + Message: g.ParamEmptyE, + }) + } + + ss := make([]*misc.Strategy, 0) + var query string + if tp == "0" { + query = fmt.Sprintf("select * from strategy where service ='%s' order by modify_date desc", service) + } else { + query = fmt.Sprintf("select * from strategy where service ='%s' and type='%s' order by modify_date desc", service, tp) + } + + err := g.DB.Select(&ss, query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + Data: ss, + }) +} + +func Delete(c echo.Context) error { + service := c.FormValue("service") + id := c.FormValue("id") + name := c.FormValue("name") + tp, _ := strconv.Atoi(c.FormValue("type")) + if service == "" || id == "" { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamEmptyC, + Message: g.ParamEmptyE, + }) + } + + if !canOperate(c, service) { + return c.JSON(http.StatusForbidden, g.Result{ + Status: http.StatusForbidden, + ErrCode: g.ForbiddenC, + Message: g.ForbiddenE, + }) + } + + var query string + switch tp { + case misc.STRATEGY_BWLIST: + query = fmt.Sprintf("update api_define set bw_strategy='%d' where bw_strategy='%s'", 0, id) + case misc.STRATEGY_RETRY: + query = fmt.Sprintf("update api_define set retry_strategy='%d' where retry_strategy='%s'", 0, id) + case misc.STRATEGY_TRAFFIC: + query = fmt.Sprintf("update api_define set traffic_strategy='%d' where traffic_strategy='%s'", 0, id) + default: + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamInvalidC, + Message: g.ParamInvalidE, + }) + } + _, err := g.DB.Exec(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + switch tp { + case misc.STRATEGY_BWLIST: + query = fmt.Sprintf("update api_release set bw_strategy='%d' where bw_strategy='%s'", 0, id) + case misc.STRATEGY_RETRY: + query = fmt.Sprintf("update api_release set retry_strategy='%d' where retry_strategy='%s'", 0, id) + case misc.STRATEGY_TRAFFIC: + query = fmt.Sprintf("update api_release set traffic_strategy='%d' where traffic_strategy='%s'", 0, id) + } + _, err = g.DB.Exec(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + query = fmt.Sprintf("delete from strategy where id='%s'", id) + _, err = g.DB.Exec(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + audit.Log(c.FormValue("username"), service, audit.TypeStrategy, fmt.Sprintf("%s:%s", id, name), audit.OpDelete, "", "") + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + }) +} + +func Change(c echo.Context) error { + status, _ := strconv.Atoi(c.FormValue("status")) + id := c.FormValue("id") + if id == "" { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamEmptyC, + Message: g.ParamEmptyE, + }) + } + + newS := 0 + op := 0 + switch status { + case misc.STRATEGY_ON: + newS = misc.STRATEGY_OFF + op = audit.OpOffline + case misc.STRATEGY_OFF: + newS = misc.STRATEGY_ON + op = audit.OpRelease + default: + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamInvalidC, + Message: g.ParamInvalidE, + }) + } + + query := fmt.Sprintf("update strategy set status='%d' where id='%s'", newS, id) + _, err := g.DB.Exec(query) + if err != nil { + g.L.Info("access database error", zap.Error(err), zap.String("query", query)) + return c.JSON(http.StatusInternalServerError, g.Result{ + Status: http.StatusInternalServerError, + ErrCode: g.DatabaseC, + Message: g.DatabaseE, + }) + } + + // 查询发布的strategy内容 + s := misc.Strategy{} + g.DB.Get(&s, fmt.Sprintf("select * from strategy where id='%s'", id)) + d, _ := json.Marshal(s) + audit.Log(c.FormValue("username"), s.Service, audit.TypeStrategy, fmt.Sprintf("%s:%s", id, s.Name), op, talent.Bytes2String(d), "") + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + }) +} + +func Query(c echo.Context) error { + id := c.FormValue("id") + if id == "" { + return c.JSON(http.StatusBadRequest, g.Result{ + Status: http.StatusBadRequest, + ErrCode: g.ParamEmptyC, + Message: g.ParamEmptyE, + }) + } + + s := misc.Strategy{} + g.DB.Get(&s, fmt.Sprintf("select * from strategy where id='%s'", id)) + + return c.JSON(http.StatusOK, g.Result{ + Status: http.StatusOK, + Data: s, + }) +} + +func canOperate(c echo.Context, service string) bool { + role := c.FormValue("app_priv") + userID := c.FormValue("username") + + if role == g.ROLE_NORMAL { + // 验证是否是service创建者 + var temp interface{} + query := fmt.Sprintf("select id from service where name ='%s' and creator='%s'", service, userID) + err := g.DB.Get(&temp, query) + if err == nil { + // 是创建者 + return true + } + + // 验证是否是管理员 + query = fmt.Sprintf("select privilege from privilege where user_id='%s' and service='%s'", userID, service) + rows, err := g.DB.Query(query) + if !rows.Next() { + // 不存在该用户的权限 + return false + } + + var priv int + rows.Scan(&priv) + if priv == misc.PRIVILEGE_ADMIN { + // 是管理员 + return true + } + + return false + } else { //是应用级别的管理员 + return true + } +} diff --git a/api/req/req.go b/api/req/req.go new file mode 100644 index 00000000..203e7f67 --- /dev/null +++ b/api/req/req.go @@ -0,0 +1,144 @@ +package req + +import ( + "errors" + "fmt" + "net/http" + + "github.com/mafanr/juz/misc" + + "github.com/mafanr/g" + + "github.com/labstack/echo" +) + +type Request struct { + Rid int64 + Method string + Params map[string]string + Cookies []*http.Cookie + Api *misc.API + DebugOn bool + ClientIP string + + BwStrategy *misc.BwStrategy + RetryStrategy *misc.RetryStrategy + TrafficStrategy *misc.TrafficStrategy +} + +func (r *Request) String() string { + return fmt.Sprintf("method: %s,params: %v, api: %v, client_ip: %s", r.Method, r.Params, *r.Api, r.ClientIP) +} + +func Parse(c echo.Context) (*Request, error) { + r := &Request{ + Rid: c.Get("rid").(int64), + Params: make(map[string]string), + } + + // 解析参数 + ps, _ := c.FormParams() + for k, v := range ps { + // debug参数不透传 + if k == g.DEBUG_PARAM { + if v[0] == "true" { + r.DebugOn = true + } + } else { + r.Params[k] = v[0] + } + } + + // 解析cookie + r.Cookies = c.Cookies() + + r.ClientIP = c.RealIP() + + r.Method = c.Request().Method + + var apiName string + var version string + + // 判断是路径映射还是api映射 + uri := c.Request().URL.Path + if uri == "/service/api" { + apiName = r.Params["service_id"] + if apiName == "" { + //新网关使用以下参数'api_name' + apiName = r.Params["api_name"] + if apiName == "" { + return r, errors.New("api_name not founded") + } + } + } else { + apiName = uri + } + + version = r.Params["api_version"] + if version == "" { + // 如果没有传version,就默认为版本1 + version = "1" + } + + apiID := apiName + ".v" + version + // 获取api信息 + apiI, ok := misc.Apis.Load(apiID) + if !ok { + return r, errors.New("api id not exist") + } + r.Api = apiI.(*misc.API) + + // 生成策略 + strategy(r) + + return r, nil +} + +func strategy(r *Request) { + // 设置策略停用时的默认值 + r.BwStrategy = &misc.BwStrategy{ + Type: 0, + } + if r.Api.BwStrategy != 0 { + s1, ok := misc.Strategies.Load(r.Api.BwStrategy) + if ok { + s := s1.(*misc.Strategy) + if s.Status == misc.STRATEGY_ON { + r.BwStrategy = s.DetailContent.(*misc.BwStrategy) + } + } + } + + // 设置策略停用时的默认值 + r.RetryStrategy = &misc.RetryStrategy{ + ReqTimeout: misc.REQ_TIMEOUT, + RetryTimes: misc.RETRY_TIMES, + RetryInterval: misc.RETRY_INTERVAL, + } + if r.Api.RetryStrategy != 0 { + s1, ok := misc.Strategies.Load(r.Api.RetryStrategy) + if ok { + s := s1.(*misc.Strategy) + if s.Status == misc.STRATEGY_ON { + r.RetryStrategy = s.DetailContent.(*misc.RetryStrategy) + } + } + } + + // 设置策略停用时的默认值 + r.TrafficStrategy = &misc.TrafficStrategy{ + QPS: misc.STRATEGY_NO_LIMIT, + Concurrent: misc.STRATEGY_NO_LIMIT, + Param: "", + } + + if r.Api.TrafficStrategy != 0 { + s1, ok := misc.Strategies.Load(r.Api.TrafficStrategy) + if ok { + s := s1.(*misc.Strategy) + if s.Status == misc.STRATEGY_ON { + r.TrafficStrategy = s.DetailContent.(*misc.TrafficStrategy) + } + } + } +} diff --git a/api/router.go b/api/router.go new file mode 100644 index 00000000..2287f85c --- /dev/null +++ b/api/router.go @@ -0,0 +1,170 @@ +package api + +import ( + "errors" + "net/http" + "strconv" + "time" + + "github.com/mafanr/juz/api/filter" + "github.com/mafanr/juz/api/manage" + "github.com/mafanr/juz/api/req" + "github.com/mafanr/juz/api/stats" + "github.com/mafanr/juz/misc" + + "github.com/mafanr/g" + + "go.uber.org/zap" + + "github.com/prometheus/client_golang/prometheus" + "github.com/sunface/talent" + + "github.com/labstack/echo" + "github.com/valyala/fasthttp" +) + +/* 请求路由模块,所有进入的请求都在这里被处理和路由*/ +const ( + SYNC = 1 + REDIRECT = 2 + ASYNC = 9 +) + +type router struct { + apiServer *ApiServer + *filter.Filter +} + +/*----------------------请求路由---------------------*/ +func (router *router) route(c echo.Context) error { + // 解析请求 + r, err := req.Parse(c) + if err != nil { + c.Set("api_id", "error_api_id") + c.Set("service", "error_service") + c.Set("label", "error_label") + c.Set("error_msg", err) + return c.JSON(http.StatusBadRequest, g.Result{r.Rid, http.StatusBadRequest, g.ParamInvalidC, err.Error(), nil}) + } + c.Set("api_id", r.Api.APIID) + c.Set("service", r.Api.Service) + c.Set("label", r.Api.Label) + + g.Debug(r.DebugOn, misc.Conf.Common.LogLevel, "request content", zap.Int64("rid", r.Rid), zap.String("req", r.String())) + + // 判断api是否发布 + if r.Api.Status != misc.API_RELEASED { + return c.JSON(http.StatusBadRequest, g.Result{r.Rid, http.StatusBadRequest, manage.APIOfflineC, manage.APIOfflineE, nil}) + } + + // 在请求路由之前进行过滤 + res := router.BeforeRoute(r) + if res.Status != 0 { + c.Set("error_msg", errors.New(res.Emsg)) + return c.JSON(res.Status, g.Result{r.Rid, res.Status, res.Ecode, res.Emsg, nil}) + } + + // 开始请求 + var code int + var body []byte + + switch r.Api.RouteType { + case SYNC: // 同步请求 + code, body, err = router.sync(r) + case REDIRECT: //重定向 + return router.redirect(c, r) + } + + // 请求失败,通知客户端 + if err != nil { + c.Set("error_msg", err) + return c.JSON(code, g.Result{r.Rid, code, g.ReqFailedC, err.Error(), nil}) + } + + g.Debug(r.DebugOn, misc.Conf.Common.LogLevel, "response body", zap.Int64("rid", r.Rid), zap.Int("code", code), zap.String("body", talent.Bytes2String(body))) + + // 成功时,把请求id放在header中返回,避免污染返回结果 + c.Response().Header().Add("rid", strconv.FormatInt(r.Rid, 10)) + + // 返回给客户端成功的结果 + return c.String(code, talent.Bytes2String(body)) +} + +func (rt *router) redirect(c echo.Context, r *req.Request) error { + // 组装参数 + url := r.Api.RouteAddr + "?" + c.QueryString() + return c.Redirect(http.StatusMovedPermanently, url) +} + +func (rt *router) sync(r *req.Request) (int, []byte, error) { + args := &fasthttp.Args{} + for k, v := range r.Params { + args.Set(k, v) + } + + req := &fasthttp.Request{} + resp := &fasthttp.Response{} + + // 放入自定义cookie信息 + for _, ck := range r.Cookies { + req.Header.SetCookie(ck.Name, ck.Value) + } + + // 请求头部加入request id,方便后续业务进行跟踪 + req.Header.Set("rid", strconv.FormatInt(r.Rid, 10)) + + req.Header.SetMethod(r.Method) + + // 写入客户端真实ip + req.Header.Set("X-Forwarded-For", r.ClientIP) + + url := r.Api.RouteAddr + switch r.Method { + case "GET": + // 拼接url + url = url + "?" + args.String() + default: + args.WriteTo(req.BodyWriter()) + } + + req.SetRequestURI(url) + + // 超时重试 + retrys := 0 + var err error + + for { + res := rt.Filter.BeforeCall(r) + if res.Status != 0 { + return res.Status, nil, errors.New(res.Emsg) + } + + err = g.Cli.DoTimeout(req, resp, time.Duration(r.RetryStrategy.ReqTimeout)*time.Second) + // time.Sleep(10 * time.Second) + rt.Filter.AfterCall(r) + if err == nil { + // 统计请求code + stats.Codes.With(prometheus.Labels{ + "code": strconv.Itoa(resp.StatusCode()), + "api_id": r.Api.APIID, + "service": r.Api.Service, + "label": r.Api.Label, + }).Inc() + break + } + // 统计请求错误 + stats.Errors.With(prometheus.Labels{ + "api_id": r.Api.APIID, + "service": r.Api.Service, + "label": r.Api.Label, + }).Inc() + // 发生错误,进行重试 + if retrys >= r.RetryStrategy.RetryTimes { + break + } + time.Sleep(time.Duration(r.RetryStrategy.RetryInterval) * time.Second) + retrys++ + } + + return resp.StatusCode(), resp.Body(), err +} diff --git a/api/sql_data/.DS_Store b/api/sql_data/.DS_Store new file mode 100644 index 00000000..5008ddfc Binary files /dev/null and b/api/sql_data/.DS_Store differ diff --git a/api/sql_data/juz.sql b/api/sql_data/juz.sql new file mode 100644 index 00000000..ee910ddb --- /dev/null +++ b/api/sql_data/juz.sql @@ -0,0 +1,115 @@ +create database if not exists mafanr_juz; +USE mafanr_juz; + +CREATE TABLE IF NOT EXISTS `api_release` ( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id', + `api_id` varchar(255) NOT NULL COMMENT 'API ID', + `path_type` int(11) DEFAULT '0' COMMENT '是否是路径映射类型,0代表否,1代表是', + `service` varchar(255) NOT NULL COMMENT 'service名', + `description` text COMMENT '介绍', + + `route_type` int(11) NOT NULL DEFAULT '1' COMMENT '代理类型', + `route_addr` varchar(255) NOT NULL COMMENT '后段服务地址', + `route_proto` int(11) DEFAULT '1' COMMENT '后端服务协议', + `mock_data` text COMMENT 'mock类型接口,返回定义的mock数据', + + `retry_strategy` int(11) DEFAULT '0' COMMENT '重试策略ID', + `bw_strategy` int(11) DEFAULT '0' COMMENT '黑白名单策略ID', + `traffic_strategy` int(11) DEFAULT '0' COMMENT '流量控制策略ID', + + `traffic_on` int(11) DEFAULT '0' COMMENT '是否开启流量路由', + `traffic_api` varchar(255) DEFAULT '' COMMENT '指定部分流量路由到该api name', + `traffic_ratio` int(11) DEFAULT '0' COMMENT '被路由的流量占比, 0<=x<=100', + `traffic_ips` varchar(255) DEFAULT '' COMMENT '指定来自哪些ip的流量被路由', + + `verify_on` int(11) DEFAULT '0' COMMENT '是否开启参数验证', + `param_rules` longtext COMMENT '参数验证表', + + + `cached_time` int(11) DEFAULT '0' COMMENT '为0表示不开启,其它值代表缓存的时间', + + `label` varchar(255) DEFAUlT '' COMMENT '标签分组', + + `status` int(11) DEFAULT '1' COMMENT '发布状态,0代表未发布,1代表已发布', + + `create_date` datetime DEFAULT NULL COMMENT '创建时间', + `modify_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录更新时间', + PRIMARY KEY (`id`), + UNIQUE KEY `Index_api_apiid` (`api_id`) USING BTREE, + KEY `Index_api_modifydate` (`modify_date`) USING BTREE, + KEY `Index_api_status` (`status`) USING BTREE +) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT COMMENT 'api发布表'; + +CREATE TABLE IF NOT EXISTS `api_define` ( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id', + `api_id` varchar(255) NOT NULL COMMENT 'API ID', + `path_type` int(11) DEFAULT '0' COMMENT '是否是路径映射类型,0代表否,1代表是', + `service` varchar(255) NOT NULL COMMENT 'service名', + `description` text COMMENT '介绍', + + `route_type` int(11) NOT NULL DEFAULT '1' COMMENT '代理类型', + `route_addr` varchar(255) NOT NULL COMMENT '后段服务地址', + `route_proto` int(11) DEFAULT '1' COMMENT '后端服务协议', + `mock_data` text COMMENT 'mock类型接口,返回定义的mock数据', + + `retry_strategy` int(11) DEFAULT '0' COMMENT '重试策略ID', + `bw_strategy` int(11) DEFAULT '0' COMMENT '黑白名单策略ID', + `traffic_strategy` int(11) DEFAULT '0' COMMENT '流量控制策略ID', + + `traffic_on` int(11) DEFAULT '0' COMMENT '是否开启流量路由', + `traffic_api` varchar(255) DEFAULT '' COMMENT '指定部分流量路由到该api name', + `traffic_ratio` int(11) DEFAULT '0' COMMENT '被路由的流量占比, 0<=x<=100', + `traffic_ips` varchar(255) DEFAULT '' COMMENT '指定来自哪些ip的流量被路由', + + `verify_on` int(11) DEFAULT '0' COMMENT '是否开启参数验证', + `param_rules` longtext COMMENT '参数验证表', + + + `cached_time` int(11) DEFAULT '0' COMMENT '为0表示不开启,其它值代表缓存的时间', + + `label` varchar(255) DEFAUlT '' COMMENT '标签分组', + + `revise_version` varchar(255) NOT NULL COMMENT 'api内容最新版本号', + `release_version` varchar(255) DEFAULT '' COMMENT '当前已发布的版本号', + + `create_date` datetime DEFAULT NULL COMMENT '创建时间', + `modify_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录更新时间', + PRIMARY KEY (`id`), + UNIQUE KEY `Index_api_apiid` (`api_id`) USING BTREE, + KEY `Index_strategy_modifydate` (`modify_date`) USING BTREE +) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT COMMENT 'api_define'; + + + + +CREATE TABLE IF NOT EXISTS `strategy` ( + `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id', + `name` varchar(255) DEFAULT '' COMMENT 'strategy名', + `service` varchar(255) DEFAULT '' COMMENT 'service名', + `type` int(11) DEFAULT NULL COMMENT '策略类型,1黑白名单;2超时重试等等', + `sub_type` int(11) DEFAULT NULL COMMENT '策略的子类型,一般是和内容相关的类型,例如黑白名单中的黑或者白', + `content` longtext COMMENT '策略的具体内容', + `status` int(11) DEFAULT '1' COMMENT '1开启, 0停用', + `create_date` datetime DEFAULT NULL, + `modify_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录更新时间', + PRIMARY KEY (`id`), + KEY `Index_strategy_service` (`service`) USING BTREE, + UNIQUE KEY `Index_api_strategy_servicename` (`service`,`name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT '通用策略表'; + +CREATE TABLE IF NOT EXISTS `audit_log` ( + `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id', + `user_id` varchar(255) DEFAULT '' COMMENT 'strategy名', + `service` varchar(255) DEFAULT '' COMMENT 'service名', + `target_type` int(11) DEFAULT '0' COMMENT '目标类型,1:service 2: api 3: 策略', + `target_id` varchar(255) DEFAUlT '' COMMENT '操作的目标id/name', + `op_type` int(11) DEFAULT '0' COMMENT '操作类型:1. 创建 2. 更新 3. 管理 4. 删除', + `content` TEXT COMMENT '具体内容', + `description` TEXT COMMENT '操作备注', + `modify_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录更新时间', + PRIMARY KEY (`id`), + KEY `Index_audit_log_service` (`service`) USING BTREE, + KEY `Index_audit_log_tid` (`target_id`) USING BTREE, + KEY `Index_audit_log_type` (`target_type`) USING BTREE, + KEY `Index_audit_log_modifydate` (`modify_date`) USING BTREE +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT '审计日志表'; diff --git a/api/stats/stats.go b/api/stats/stats.go new file mode 100644 index 00000000..0ed231eb --- /dev/null +++ b/api/stats/stats.go @@ -0,0 +1,40 @@ +package stats + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +//ApiID指标项: 请求数、http code分布、错误统计、耗时统计、QPS +//Label指标项:继承ApiID,请求数top n,错误数top n,耗时top n, qps top n +//Service指标项: 继承Label + +var ( + Req = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Name: "req_distribution", + Help: "Request stats", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, []string{"api_id", "service", "label"}) + Limits = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "req_limits", + Help: "Request blocked", + }, []string{"api_id", "service", "label"}) + Errors = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "req_errors", + Help: "Request error", + }, []string{"api_id", "service", "label"}) + Codes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "req_codes", + Help: "Reqeust http code", + }, + []string{"code", "api_id", "service", "label"}, + ) +) + +func init() { + // Metrics have to be registered to be exposed: + prometheus.MustRegister(Req) + prometheus.MustRegister(Limits) + prometheus.MustRegister(Errors) + prometheus.MustRegister(Codes) +} diff --git a/api/test/Reamd.md b/api/test/Reamd.md new file mode 100644 index 00000000..6d10dfa6 --- /dev/null +++ b/api/test/Reamd.md @@ -0,0 +1 @@ +# 集成测试 \ No newline at end of file diff --git a/api/test/http_server.go b/api/test/http_server.go new file mode 100644 index 00000000..f2a98e1a --- /dev/null +++ b/api/test/http_server.go @@ -0,0 +1,34 @@ +package main + +import ( + "flag" + "log" + + "runtime" + + "github.com/valyala/fasthttp" +) + +func main() { + runtime.GOMAXPROCS(1) + flag.Parse() + + h := requestHandler + + if err := fasthttp.ListenAndServe("localhost:10001", h); err != nil { + log.Fatalf("Error in ListenAndServe: %s", err) + } +} + +func requestHandler(ctx *fasthttp.RequestCtx) { + ctx.SetContentType("text/plain; charset=utf8") + + // Set arbitrary headers + ctx.Response.Header.Set("X-My-Header", "my-header-value") + + // Set cookies + var c fasthttp.Cookie + c.SetKey("cookie-name") + c.SetValue("cookie-value") + ctx.Response.Header.SetCookie(&c) +} diff --git a/api/test/黑盒测试用例.txt b/api/test/黑盒测试用例.txt new file mode 100644 index 00000000..764f7830 --- /dev/null +++ b/api/test/黑盒测试用例.txt @@ -0,0 +1,16 @@ +一、代理 +0.通用规则 + + +1. 除非请求业务方返回200,否则tfe不可以返回200 code +1.同步代理 +1)对GET/POST/PUT/DELETE进行测试,Method需要透传 +2)对参数service_id为空、不存在的错误进行验证 +3)被请求方要能看到最初的客户端IP,通过X-Forwarded-For透传 + +2.重定向 +1) 301 +2) 目前只有Get请求的参数会透传 +二、数据加载 +1. 启动时数据全量加载 +2. 更新时,重新加载某一行 \ No newline at end of file diff --git a/cmd/api.go b/cmd/api.go new file mode 100644 index 00000000..4942109d --- /dev/null +++ b/cmd/api.go @@ -0,0 +1,71 @@ +// Copyright © 2018 NAME HERE +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "os" + "os/signal" + "strings" + "syscall" + + "github.com/mafanr/juz/misc" + + "github.com/mafanr/juz/api" + + "github.com/mafanr/g" + + "github.com/spf13/cobra" + "go.uber.org/zap" +) + +// apiCmd represents the api command +var apiCmd = &cobra.Command{ + Use: "api", + Short: "api gateway", + Long: ``, + // Uncomment the following line if your bare application + // has an action associated with it: + Run: func(cmd *cobra.Command, args []string) { + misc.InitConfig("juz.conf") + misc.Conf.Common.LogLevel = strings.ToLower(misc.Conf.Common.LogLevel) + g.InitLogger() + g.L.Info("Application version", zap.String("version", misc.Conf.Common.Version)) + + p := &api.ApiServer{} + p.Start() + + // 等待服务器停止信号 + chSig := make(chan os.Signal) + signal.Notify(chSig, syscall.SIGINT, syscall.SIGTERM) + + sig := <-chSig + g.L.Info("juz received signal", zap.Any("signal", sig)) + + }, +} + +func init() { + rootCmd.AddCommand(apiCmd) + + // Here you will define your flags and configuration settings. + + // Cobra supports Persistent Flags which will work for this command + // and all subcommands, e.g.: + // apiCmd.PersistentFlags().String("foo", "", "A help for foo") + + // Cobra supports local flags which will only run when this command + // is called directly, e.g.: + // apiCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") +} diff --git a/cmd/root.go b/cmd/root.go new file mode 100644 index 00000000..b458d001 --- /dev/null +++ b/cmd/root.go @@ -0,0 +1,55 @@ +// Copyright © 2018 NAME HERE +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" +) + +var cfgFile string + +// rootCmd represents the base command when called without any subcommands +var rootCmd = &cobra.Command{ + Use: "tfe", + Short: "Api网关", + Long: ``, + // Uncomment the following line if your bare application + // has an action associated with it: + Run: func(cmd *cobra.Command, args []string) { + }, +} + +// Execute adds all child commands to the root command and sets flags appropriately. +// This is called by main.main(). It only needs to happen once to the rootCmd. +func Execute() { + if err := rootCmd.Execute(); err != nil { + fmt.Println(err) + os.Exit(1) + } +} + +func init() { + // Here you will define your flags and configuration settings. + // Cobra supports persistent flags, which, if defined here, + // will be global for your application. + // rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.tfe.yaml)") + + // Cobra also supports local flags, which will only run + // when this action is called directly. + // rootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") +} diff --git a/cmd/traffic.go b/cmd/traffic.go new file mode 100644 index 00000000..98ce4a9c --- /dev/null +++ b/cmd/traffic.go @@ -0,0 +1,67 @@ +// Copyright © 2018 NAME HERE +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "os" + "os/signal" + "strings" + "syscall" + + "github.com/mafanr/juz/misc" + "github.com/mafanr/juz/traffic" + + "github.com/mafanr/g" + + "github.com/spf13/cobra" + "go.uber.org/zap" +) + +// trafficCmd represents the traffic command +var trafficCmd = &cobra.Command{ + Use: "traffic", + Short: "traffic control center", + Long: ``, + Run: func(cmd *cobra.Command, args []string) { + misc.InitConfig("juz.conf") + misc.Conf.Common.LogLevel = strings.ToLower(misc.Conf.Common.LogLevel) + g.InitLogger() + g.L.Info("Application version", zap.String("version", misc.Conf.Common.Version)) + + p := &traffic.Traffic{} + p.Start() + + // 等待服务器停止信号 + chSig := make(chan os.Signal) + signal.Notify(chSig, syscall.SIGINT, syscall.SIGTERM) + + sig := <-chSig + g.L.Info("juz received signal", zap.Any("signal", sig)) + }, +} + +func init() { + rootCmd.AddCommand(trafficCmd) + + // Here you will define your flags and configuration settings. + + // Cobra supports Persistent Flags which will work for this command + // and all subcommands, e.g.: + // trafficCmd.PersistentFlags().String("foo", "", "A help for foo") + + // Cobra supports local flags which will only run when this command + // is called directly, e.g.: + // trafficCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") +} diff --git a/juz.conf b/juz.conf new file mode 100644 index 00000000..553b837c --- /dev/null +++ b/juz.conf @@ -0,0 +1,33 @@ +common: + version: 0.0.1 + loglevel: warn + admintoken: "juz.io" + +api: + port: "8081" + serverid: 1 + +manage: + port: "8089" + +mysql: + # addr: 10.7.13.48 + # port: 8066 + # database: tfe + # acc: tfe + # pw: EUnt7sbiRzYzpRLz8S21 + addr: localhost + port: 3306 + database: mafanr_juz + acc: root + pw: + +etcd: + addrs: + - "localhost:2379" + # - "10.7.24.191:2379" + # - "10.7.24.191:2379" + +traffic: + host: "127.0.0.1" + port: "8088" diff --git a/main.go b/main.go new file mode 100644 index 00000000..adc550e1 --- /dev/null +++ b/main.go @@ -0,0 +1,21 @@ +// Copyright © 2018 NAME HERE +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import "github.com/mafanr/juz/cmd" + +func main() { + cmd.Execute() +} diff --git a/misc/config.go b/misc/config.go new file mode 100644 index 00000000..4ba74174 --- /dev/null +++ b/misc/config.go @@ -0,0 +1,69 @@ +// Copyright © 2018 Sunface +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package misc + +import ( + "io/ioutil" + "log" + + "gopkg.in/yaml.v2" +) + +type Config struct { + Common struct { + Version string + LogLevel string + AdminToken string + } + Api struct { + Port string + ServerID int64 + } + Manage struct { + Port string + } + Mysql struct { + Addr string + Port string + Database string + Acc string + Pw string + } + Etcd struct { + Addrs []string + } + + Traffic struct { + Host string + Port string + } +} + +var Conf *Config + +func InitConfig(path string) { + conf := &Config{} + data, err := ioutil.ReadFile(path) + if err != nil { + log.Fatal("read config error :", err) + } + + err = yaml.Unmarshal(data, &conf) + if err != nil { + log.Fatal("yaml decode error :", err) + } + + Conf = conf +} diff --git a/misc/const.go b/misc/const.go new file mode 100644 index 00000000..a31b4b91 --- /dev/null +++ b/misc/const.go @@ -0,0 +1,58 @@ +package misc + +const TEST_API_NAME = "devops.test.get.v1" + +// 默认请求策略 +const ( + REQ_TIMEOUT = 15 + RETRY_TIMES = 0 + RETRY_INTERVAL = 5 +) + +const ( + PRIVILEGE_SUPER_ADMIN = 0 + PRIVILEGE_ADMIN = 1 + PRIVILEGE_NORMAL = 2 + PRIVILEGE_VIEWER = 3 +) + +// 通用的 +const ( + ON = 1 + OFF = 0 +) + +// 以下值绝对不可更改,前端也在使用 +const ( + STRATEGY_NO_LIMIT = 0 + + BW_OFF = 0 + BLACK_LIST = 1 + WHITE_LIST = 2 + + IP_TYPE = 1 + PARAM_TYPE = 2 + + TRAFFIC_ON = 1 + TRAFFIC_OFF = 0 + + PARAM_VERIFY_ON = 1 + PARAM_VERIFY_OFF = 0 + + API_RELEASED = 1 + API_OFFLINE = 0 + + STRATEGY_ALL = -1 + STRATEGY_EMPTY = 0 + STRATEGY_BWLIST = 1 + STRATEGY_RETRY = 2 + STRATEGY_TRAFFIC = 3 + + STRATEGY_ON = 1 + STRATEGY_OFF = 0 +) + +// redis后缀 +const ( + TRAFFIC_CONCURRENT = ".cr" +) diff --git a/misc/data.go b/misc/data.go new file mode 100644 index 00000000..00de1377 --- /dev/null +++ b/misc/data.go @@ -0,0 +1,140 @@ +package misc + +import ( + "fmt" + "sync" + + "github.com/mafanr/g" + + "github.com/jmoiron/sqlx" + "go.uber.org/zap" +) + +type API struct { + ID int `db:"id" json:"id"` + // 基本设置 + Service string `db:"service" json:"service"` + APIID string `db:"api_id" json:"api_id"` + PathType int `db:"path_type" json:"path_type"` + Desc *string `db:"description" json:"desc"` + + RouteType int `db:"route_type" json:"route_type"` + RouteAddr string `db:"route_addr" json:"route_addr"` + RouteProto int `db:"route_proto" json:"route_proto"` + MockData *string `db:"mock_data" json:"mock_data"` + + // 通用策略 + RetryStrategy int `db:"retry_strategy" json:"retry_strategy"` + BwStrategy int `db:"bw_strategy" json:"bw_strategy"` + TrafficStrategy int `db:"traffic_strategy" json:"traffic_strategy"` + + // 流量路由 + TrafficOn int `db:"traffic_on" json:"traffic_on"` + TrafficAPI string `db:"traffic_api" json:"traffic_api"` + TrafficRatio int `db:"traffic_ratio" json:"traffic_ratio"` + TrafficIPs string `db:"traffic_ips" json:"traffic_ips"` + + // 参数验证 + VerifyOn int `db:"verify_on" json:"verify_on"` + ParamTable *string `db:"param_rules" json:"param_rules"` + + // 缓存 + CachedTime int `db:"cached_time" json:"cached_time"` + + // 标签分组 + Label string `db:"label" json:"label"` + + // API修订的版本号 + ReviseVersion string `db:"revise_version" json:"revise_version"` + ReleaseVersion string `db:"release_version" json:"release_version"` + + Status int `db:"status" json:"status"` + + // 日期相关 + CreateDate string `db:"create_date" json:"create_date"` + ModifyDate string `db:"modify_date" json:"modify_date"` + + ParamRules *sync.Map +} + +type Service struct { + ID int `db:"id"` + Name string `db:"name"` + Creator string `db:"creator"` + CreateDate string `db:"create_date"` + ModifyDate string `db:"modify_date"` +} +type BW struct { + Type int `json:"type"` + Key string `json:"key"` + Val string `json:"val"` +} + +type ParamRule struct { + Param string `json:"param"` + ParamRule string `json:"rule"` + TestData string `json:"test_data"` +} + +type Strategy struct { + ID int `db:"id" json:"id"` + Name string `db:"name" json:"name"` + Service string `db:"service" json:"service"` + Type int `db:"type" json:"type"` + SubType int `db:"sub_type" json:"sub_type"` + Content string `db:"content" json:"content"` + Status int `db:"status" json:"status"` + CreateDate string `db:"create_date" json:"create_date"` + ModifyDate string `db:"modify_date" json:"modify_date"` + + DetailContent interface{} `json:"-"` // 把content翻译成具体的策略语言 +} + +type BwStrategy struct { + Type int // 黑 or 白 + BwList []*BW +} +type RetryStrategy struct { + ReqTimeout int `json:"req_timeout"` + RetryTimes int `json:"retry_times"` + RetryInterval int `json:"retry_interval"` +} + +type TrafficStrategy struct { + // 接口流量 + QPS int `json:"qps"` + Concurrent int `json:"concurrent"` + + // 用户流量 + Param string `json:"param"` // 限制参数 + Span int `json:"span"` // 限定时间 + Times int `json:"times"` // 限定次数 + + // 熔断设置 + FuseError int `json:"fuse_error"` // 熔断错误率,大于该值时,触发熔断保护 + FuseErrorCount int `json:"fuse_error_count"` // 熔断触发的最小请求次数 + FusePercent int `json:"fuse_percent"` // 熔断触发后,允许访问的百分比,例如100次请求,只有50次允许通过 + FuseRecover int `json:"fuse_recover"` // 熔断错误率小于该值时,取消熔断保护 + FuseRecoverCount int `json:"fuse_recover_count"` // 熔断恢复的最小请求次数 +} + +var Apis = &sync.Map{} +var Strategies = &sync.Map{} + +func InitMysql() { + var err error + + // 初始化mysql连接 + sqlConn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", Conf.Mysql.Acc, Conf.Mysql.Pw, + Conf.Mysql.Addr, Conf.Mysql.Port, Conf.Mysql.Database) + g.DB, err = sqlx.Open("mysql", sqlConn) + if err != nil { + g.L.Fatal("init mysql error", zap.Error(err)) + } + + // 测试db是否正常 + err = g.DB.Ping() + if err != nil { + g.L.Fatal("init mysql, ping error", zap.Error(err)) + } +} diff --git a/misc/rpc.go b/misc/rpc.go new file mode 100644 index 00000000..6e317e0c --- /dev/null +++ b/misc/rpc.go @@ -0,0 +1,11 @@ +package misc + +type TrafficConReq struct { + ApiName string + StrategyID int + ParamVal string // 用户流量限定中对应的参数值,例如限定了mobile,则对应的值为15880261185 +} +type TrafficConRes struct { + Suc bool + Error string +} diff --git a/traffic/load_data.go b/traffic/load_data.go new file mode 100644 index 00000000..93dab2a9 --- /dev/null +++ b/traffic/load_data.go @@ -0,0 +1,70 @@ +package traffic + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/mafanr/juz/misc" + + "github.com/mafanr/g" + + "github.com/sunface/talent" + "go.uber.org/zap" +) + +func (t *Traffic) loadData() { + lastLoadTime = time.Now() + // 加载所有数据 + t.loadAll() + + // 定时加载最新的信息 + go t.loadUpdated() +} + +var lastLoadTime time.Time + +func (t *Traffic) loadAll() { + // 加载所有strategy + strategies := make([]*misc.Strategy, 0) + err := g.DB.Select(&strategies, "select * from strategy") + if err != nil { + g.L.Fatal("load strategies error!", zap.Error(err)) + } + + for _, s := range strategies { + if s.Type == misc.STRATEGY_TRAFFIC { + ts := &misc.TrafficStrategy{} + json.Unmarshal([]byte(s.Content), &ts) + t.Strategies.Store(s.ID, ts) + } + } +} + +func (t *Traffic) loadUpdated() { + for { + // 为了防止访问时,恰好在更新数据,这里给出2秒的容忍值 + lastT := talent.Time2String(lastLoadTime.Add(-2 * time.Second)) + lastLoadTime = time.Now() + + // 加载策略 + + strategies := make([]*misc.Strategy, 0) + query := fmt.Sprintf("select * from strategy where modify_date >= '%s'", lastT) + err := g.DB.Select(&strategies, query) + if err != nil { + g.L.Error("load strategies error!", zap.Error(err), zap.String("query", query)) + return + } + + for _, s := range strategies { + if s.Type == misc.STRATEGY_TRAFFIC { + ts := &misc.TrafficStrategy{} + json.Unmarshal([]byte(s.Content), &ts) + t.Strategies.Store(s.ID, ts) + } + } + + time.Sleep(10 * time.Second) + } +} diff --git a/traffic/rate_limiter.go b/traffic/rate_limiter.go new file mode 100644 index 00000000..c1e9700c --- /dev/null +++ b/traffic/rate_limiter.go @@ -0,0 +1,137 @@ +// 对API接口的QPS进行限制 +package traffic + +import ( + "sync" + "time" + + "github.com/mafanr/juz/misc" +) + +var apiRates = &sync.Map{} + +type apiRate struct { + concurrent int + conUpdate time.Time + + qps int + qpsUpdate time.Time + + params *sync.Map +} + +type param struct { + times int + lastTime time.Time +} + +// 能调用此函数,说明qps、并发至少有一项受到了限制 +func (rl *RateLimiter) IncApiRate(req misc.TrafficConReq, reply *misc.TrafficConRes) error { + reply.Suc = true + + si, ok := traffic.Strategies.Load(req.StrategyID) + if !ok { + // 不存在策略,就没有限制 + return nil + } + s := si.(*misc.TrafficStrategy) + + ari, ok := apiRates.Load(req.ApiName) + now := time.Now() + if !ok { + // 之前没有数据,初始化数据 + ar := apiRate{1, now, 1, now, &sync.Map{}} + apiRates.Store(req.ApiName, &ar) + return nil + } + ar := ari.(*apiRate) + + // QPS限制 + if s.QPS != misc.STRATEGY_NO_LIMIT { + //要检查一下时间,如果超过一秒,则重新初始化数据 + if now.Sub(ar.qpsUpdate) > 1e9 { + ar.qps = 1 + ar.qpsUpdate = now + return nil + } + + // 没超过一秒,超出限制,返回失败 + if ar.qps >= s.QPS { + reply.Suc = false + return nil + } + + // 没超过一秒,每超出,更新数据 + ar.qps++ + } + + // 对并发进行限制 + if s.Concurrent != misc.STRATEGY_NO_LIMIT { + // 这里要做一个防BUG设置,防止某些情况下,并发数到了最大值,结果不能减少或者归零 + if ar.concurrent >= s.Concurrent { + // 如果当前时间距离上次更新并发时间,大于5秒,则认为出现了bug,重置并发 + if now.Sub(ar.conUpdate) > 1e9 { + ar.concurrent = 0 + ar.conUpdate = now + return nil + } + + // 当前数量已经超过了并发数 + reply.Suc = false + return nil + } + + ar.concurrent++ + ar.conUpdate = now + } + + // 用户流量,访问次数限制 + if req.ParamVal != "" { + pi, ok := ar.params.Load(req.ParamVal) + if !ok { + // 不存在记录,初始化 + ar.params.Store(req.ParamVal, ¶m{1, now}) + return nil + } + + // 存在记录 + p := pi.(*param) + // 判断当前时间和上次初始化的时间的跨度 + if now.Sub(p.lastTime) > time.Duration(s.Span)*1e9 { + // 时间跨度已经超出,重新初始化 + p.times = 1 + p.lastTime = now + return nil + } + + // 还在限制时间范围内,判断请求次数是否超出 + if p.times >= s.Times { + reply.Suc = false + return nil + } + p.times++ + } + return nil +} + +func (rl *RateLimiter) DecApiRate(req misc.TrafficConReq, reply *misc.TrafficConRes) error { + reply.Suc = true + // 获取该api之前的计数 + ari, ok := apiRates.Load(req.ApiName) + if !ok { + return nil + } + + ar := ari.(*apiRate) + if ar.concurrent == 0 { + return nil + } + if ar.concurrent > 0 && ar.concurrent-1 <= 0 { + ar.concurrent = 0 + return nil + } + + ar.concurrent-- + + return nil +} diff --git a/traffic/traffic.go b/traffic/traffic.go new file mode 100644 index 00000000..6a787e51 --- /dev/null +++ b/traffic/traffic.go @@ -0,0 +1,71 @@ +package traffic + +import ( + "net" + "net/rpc" + "sync" + + "github.com/mafanr/juz/misc" + + "github.com/mafanr/g" + + "go.uber.org/zap" +) + +var traffic *Traffic + +type Traffic struct { + Strategies *sync.Map +} + +func (t *Traffic) Start() { + // 初始化mysql连接 + misc.InitMysql() + + // 加载配置数据 + t.Strategies = &sync.Map{} + t.loadData() + + // 启动rpc服务 + t.startRpcServer() + + traffic = t +} + +func (t *Traffic) Shutdown() { + g.L.Info("shutdown tfe..") +} + +type RateLimiter struct{} + +// 请不要修改该函数,用来测试rpc是否存活 +func (rl *RateLimiter) Ping(req int, reply *int) error { + *reply = 1 + return nil +} + +func (t *Traffic) startRpcServer() { + rl := new(RateLimiter) + server := rpc.NewServer() + err := server.Register(rl) + if err != nil { + g.L.Fatal("register error", zap.Error(err)) + } + + g.L.Info("Listen tcp on port", zap.String("port", misc.Conf.Traffic.Port)) + l, err := net.Listen("tcp", ":"+misc.Conf.Traffic.Port) + if err != nil { + g.L.Fatal("listen error", zap.Error(err)) + } + + go func() { + for { + conn, err := l.Accept() + if err != nil { + g.L.Error("accept error", zap.Error(err)) + continue + } + server.ServeConn(conn) + } + }() +}