You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

131 lines
3.0 KiB

7 years ago
package api
import (
"net/http"
"net/rpc"
"time"
"github.com/mafanr/juz/api/filter"
"github.com/mafanr/juz/api/manage"
"github.com/mafanr/juz/api/stats"
"github.com/mafanr/juz/misc"
"github.com/mafanr/g"
_ "github.com/go-sql-driver/mysql"
"github.com/labstack/echo"
7 years ago
"github.com/labstack/echo/middleware"
7 years ago
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)
type ApiServer struct {
manage *manage.Manage
router *router
}
func (p *ApiServer) Start() {
7 years ago
g.L.Info("start tfe..")
7 years ago
7 years ago
// 获取所有内部服务节点信息
g.ETCD.QueryAll(misc.Conf.Etcd.Addrs)
7 years ago
// 初始化mysql连接
misc.InitMysql()
// 从mysql中加载所有的api信息到内存中
7 years ago
p.loadData()
7 years ago
p.manage = &manage.Manage{}
go p.manage.Start()
p.router = &router{p, &filter.Filter{}}
// 连接到traffic rpc服务
p.initTraffic()
// 启动proxy http服务
go p.listen()
// 启动metrics收集
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(":6062", nil)
}
func (o *ApiServer) Shutdown() {
7 years ago
g.L.Info("shutdown tfe..")
7 years ago
}
func (p *ApiServer) listen() {
e := echo.New()
7 years ago
e.Use(middleware.CORSWithConfig(middleware.CORSConfig{
AllowHeaders: append([]string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept}, misc.Conf.Api.Cors...),
AllowCredentials: true,
}))
7 years ago
// 回调相关
//同步回调接口
e.Any("/*", p.router.route, timing)
e.Logger.Fatal(e.Start(":" + misc.Conf.Api.Port))
}
func timing(f echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
ts := time.Now()
rid := (ts.UnixNano()/10)*10 + misc.Conf.Api.ServerID
7 years ago
g.L.Info("New request accepted", zap.Int64("rid", rid), zap.String("ip", c.RealIP()))
7 years ago
c.Set("rid", rid)
defer func() {
// 统计请求指标
apiID := c.Get("api_id").(string)
service := c.Get("service").(string)
7 years ago
label := c.Get("app").(string)
7 years ago
stats.Req.With(prometheus.Labels{
"api_id": apiID,
"service": service,
7 years ago
"app": label,
7 years ago
}).Observe(float64(time.Now().Sub(ts).Nanoseconds() / 1e6))
err := c.Get("error_msg")
if err == nil {
7 years ago
g.L.Info("Request success", zap.Int64("rid", rid))
7 years ago
} else {
7 years ago
g.L.Info("Request failed", zap.Int64("rid", rid), zap.Error(err.(error)))
7 years ago
}
}()
return f(c)
}
}
func (as *ApiServer) initTraffic() {
r, err := rpc.Dial("tcp", misc.Conf.Traffic.Host+":"+misc.Conf.Traffic.Port)
if err != nil {
7 years ago
g.L.Fatal("connect to raffic error", zap.Error(err))
7 years ago
}
as.router.Filter.Rpc = r
// 定时检测rpc连接的存活性
go func() {
for {
var res int
err := as.router.Filter.Rpc.Call("RateLimiter.Ping", 1, &res)
if err != nil || res != 1 {
7 years ago
g.L.Warn("rpc ping failed", zap.Error(err))
7 years ago
r, err := rpc.Dial("tcp", misc.Conf.Traffic.Host+":"+misc.Conf.Traffic.Port)
if err != nil {
7 years ago
g.L.Warn("re-connect to traffic error", zap.Error(err))
7 years ago
time.Sleep(2 * time.Second)
continue
}
as.router.Filter.Rpc = r
7 years ago
g.L.Info("re-connect to traffic ok")
7 years ago
}
time.Sleep(3 * time.Second)
}
}()
}