179 lines
4.3 KiB
Go
179 lines
4.3 KiB
Go
package http
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"hash/crc32"
|
|
"io/ioutil"
|
|
"net/http"
|
|
|
|
"go-common/app/infra/canal/conf"
|
|
"go-common/app/infra/canal/infoc"
|
|
config "go-common/library/conf"
|
|
"go-common/library/ecode"
|
|
bm "go-common/library/net/http/blademaster"
|
|
|
|
"github.com/BurntSushi/toml"
|
|
"github.com/siddontang/go-mysql/canal"
|
|
)
|
|
|
|
const (
|
|
_heartHeat = 60
|
|
_readTimeout = 90
|
|
_flavor = "mysql"
|
|
_updateUser = "canal"
|
|
_updateMark = "infoc"
|
|
)
|
|
|
|
// InfocConf .
|
|
type infocConf struct {
|
|
Addr string `json:"db_addr"`
|
|
User string `json:"user"`
|
|
Pass string `json:"pass"`
|
|
InfocDBs []*infocDB `json:"databases"`
|
|
}
|
|
|
|
// InfocDB .
|
|
type infocDB struct {
|
|
Schema string `json:"schema"`
|
|
Tables []*infoTable `json:"tables"`
|
|
LancerAddr string `json:"lancer_addr"`
|
|
LancerTaskID string `json:"lancer_task_id"`
|
|
LancerReportAddr string `json:"lancer_report_addr"`
|
|
Proto string `json:"proto"`
|
|
}
|
|
|
|
// InfoTable .
|
|
type infoTable struct {
|
|
Name string `json:"name"`
|
|
OmitFlied []string `json:"omit_field"`
|
|
OmitAction []string `json:"omit_action"`
|
|
}
|
|
|
|
func infocPost(c *bm.Context) {
|
|
var (
|
|
ics []*infocConf
|
|
bs []byte
|
|
err error
|
|
buf *bytes.Buffer
|
|
)
|
|
content := make(map[string]string)
|
|
if bs, err = ioutil.ReadAll(c.Request.Body); err != nil {
|
|
c.AbortWithStatus(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
if err = json.Unmarshal(bs, &ics); err != nil {
|
|
c.AbortWithStatus(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
for _, ifc := range ics {
|
|
databases := make([]*conf.Database, len(ifc.InfocDBs))
|
|
for idx, infocDB := range ifc.InfocDBs {
|
|
tables := make([]*conf.CTable, len(infocDB.Tables))
|
|
for ix, table := range infocDB.Tables {
|
|
tables[ix] = &conf.CTable{
|
|
Name: table.Name,
|
|
OmitAction: table.OmitAction,
|
|
OmitField: table.OmitFlied,
|
|
}
|
|
}
|
|
databases[idx] = &conf.Database{
|
|
Schema: infocDB.Schema,
|
|
Infoc: &infoc.Config{
|
|
TaskID: infocDB.LancerTaskID,
|
|
Addr: infocDB.LancerAddr,
|
|
ReporterAddr: infocDB.LancerReportAddr,
|
|
Proto: infocDB.Proto,
|
|
},
|
|
CTables: tables,
|
|
}
|
|
}
|
|
ic := &conf.InsConf{
|
|
Databases: databases,
|
|
Config: &canal.Config{
|
|
Addr: ifc.Addr,
|
|
User: ifc.User,
|
|
Password: ifc.Pass,
|
|
ServerID: crc32.ChecksumIEEE([]byte(ifc.Addr)),
|
|
Flavor: _flavor,
|
|
HeartbeatPeriod: _heartHeat,
|
|
ReadTimeout: _readTimeout,
|
|
},
|
|
}
|
|
var isc = &struct {
|
|
InsConf *conf.InsConf `toml:"instance"`
|
|
}{
|
|
InsConf: ic,
|
|
}
|
|
buf = new(bytes.Buffer)
|
|
if err = toml.NewEncoder(buf).Encode(isc); err != nil {
|
|
c.AbortWithStatus(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
content[fmt.Sprintf("%v.toml", ifc.Addr)] = buf.String()
|
|
}
|
|
for cn, cv := range content {
|
|
value, err := conf.ConfClient.ConfIng(cn)
|
|
if err == nil {
|
|
err = conf.ConfClient.Update(value.CID, cv, _updateUser, _updateMark)
|
|
} else if err == ecode.NothingFound {
|
|
err = conf.ConfClient.Create(cn, cv, _updateUser, _updateMark)
|
|
}
|
|
if err != nil {
|
|
c.AbortWithStatus(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func infocCurrent(c *bm.Context) {
|
|
var (
|
|
ok bool
|
|
result []*config.Value
|
|
)
|
|
if result, ok = conf.ConfClient.Configs(); !ok {
|
|
c.Status(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
ics := make([]*infocConf, 0, len(result))
|
|
for _, ns := range result {
|
|
var ic struct {
|
|
InsConf *conf.InsConf `toml:"instance"`
|
|
}
|
|
if _, err := toml.Decode(ns.Config, &ic); err != nil {
|
|
c.AbortWithStatus(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
if ic.InsConf == nil {
|
|
continue
|
|
}
|
|
icf := &infocConf{
|
|
Addr: ic.InsConf.Addr,
|
|
User: ic.InsConf.User,
|
|
Pass: ic.InsConf.Password,
|
|
}
|
|
for _, icdb := range ic.InsConf.Databases {
|
|
if icdb.Infoc == nil {
|
|
continue
|
|
}
|
|
tables := make([]*infoTable, len(icdb.CTables))
|
|
for idx, ctable := range icdb.CTables {
|
|
tables[idx] = &infoTable{
|
|
Name: ctable.Name,
|
|
OmitFlied: ctable.OmitField,
|
|
OmitAction: ctable.OmitAction,
|
|
}
|
|
}
|
|
icf.InfocDBs = append(icf.InfocDBs, &infocDB{
|
|
Schema: icdb.Schema,
|
|
Tables: tables,
|
|
LancerAddr: icdb.Infoc.Addr,
|
|
LancerTaskID: icdb.Infoc.TaskID,
|
|
})
|
|
}
|
|
ics = append(ics, icf)
|
|
}
|
|
c.JSON(ics, nil)
|
|
}
|