bilibili-backup/app/infra/canal/conf/canal_conf.go

311 lines
7.5 KiB
Go
Raw Permalink Normal View History

2019-04-22 10:59:20 +08:00
package conf
import (
"fmt"
"io/ioutil"
"net"
"regexp"
"strings"
"go-common/app/infra/canal/infoc"
"go-common/library/conf"
"go-common/library/log"
"go-common/library/queue/databus"
xtime "go-common/library/time"
"github.com/BurntSushi/toml"
"github.com/siddontang/go-mysql/canal"
"github.com/siddontang/go-mysql/client"
"github.com/siddontang/go-mysql/mysql"
)
var (
// config change event
event = make(chan *InsConf, 1)
hbaseEvent = make(chan *HBaseInsConf, 1)
tidbEvent = make(chan *TiDBInsConf, 1)
canalPath string
)
// Addition addition attrbute of canal.
type Addition struct {
PrimaryKey []string `toml:"primarykey"` // kafka msg key
OmitField []string `toml:"omitfield"` // field will be ignored in table
}
// CTable canal table.
type CTable struct {
PrimaryKey []string `toml:"primarykey"` // kafka msg key
OmitField []string `toml:"omitfield"` // field will be ignored in table
OmitAction []string `toml:"omitaction"` // action will be ignored in table
Name string `toml:"name"` // table name support regular expression
Tables []string
}
// Database represent mysql db
type Database struct {
Schema string `toml:"schema"`
Databus *databus.Config `toml:"databus"`
Infoc *infoc.Config `toml:"infoc"`
CTables []*CTable `toml:"table"`
TableMap map[string]*Addition
}
// CheckTable check database tables.
func (db *Database) CheckTable(addr, user, passwd string) (err error) {
var (
conn *client.Conn
res *mysql.Result
regex *regexp.Regexp
table string
)
db.TableMap = make(map[string]*Addition)
if conn, err = client.Connect(addr, user, passwd, db.Schema); err != nil {
return
}
defer conn.Close()
if res, err = conn.Execute(fmt.Sprintf("SHOW TABLES FROM `%s`", db.Schema)); err != nil {
log.Error("conn.Execute() error(%v)", err)
return
}
for _, ctable := range db.CTables {
if regex, err = regexp.Compile(ctable.Name); err != nil {
log.Error("regexp.Compile(%s) error(%v)", ctable.Name, err)
return
}
for _, value := range res.Values {
table = fmt.Sprintf("%s", value[0])
if regex.MatchString(table) {
db.TableMap[table] = &Addition{
PrimaryKey: ctable.PrimaryKey,
OmitField: ctable.OmitField,
}
ctable.Tables = append(ctable.Tables, table)
}
}
if len(ctable.Tables) == 0 {
return fmt.Errorf("addr(%s) db(%s) subscribles nothing,table(%s) is empty", addr, db.Schema, ctable.Name)
}
}
return
}
// InsConf instance config
type InsConf struct {
*canal.Config
MonitorPeriod xtime.Duration `toml:"monitor_period"`
MonitorOff bool `toml:"monitor_off"`
Databases []*Database `toml:"db"`
MasterInfo *MasterInfoConfig `toml:"masterinfo"`
}
// HBaseTable hbase canal table.
type HBaseTable struct {
Name string `toml:"name"` // table name
OmitField []string `toml:"omitfield"` // field will be ignored in table
}
// HBaseDatabase hbase database.
type HBaseDatabase struct {
Tables []*HBaseTable `toml:"table"`
Databus *databus.Config `toml:"databus"`
}
// HBaseInsConf hbase instance config.
type HBaseInsConf struct {
Cluster string
Root string
Addrs []string
MonitorPeriod xtime.Duration `toml:"monitor_period"`
MonitorOff bool `toml:"monitor_off"`
Databases []*HBaseDatabase `toml:"db"`
MasterInfo *MasterInfoConfig `toml:"masterinfo"`
}
// CanalConfig config struct
type CanalConfig struct {
Instances []*InsConf `toml:"instance"`
HBaseInstances []*HBaseInsConf `toml:"hbase_instance"`
TiDBInstances []*TiDBInsConf `toml:"tidb_instance"`
}
func newInsConf(fn, fc string) (c *InsConf, err error) {
var ic struct {
InsConf *InsConf `toml:"instance"`
}
ipPort := strings.TrimSuffix(fn, ".toml")
if _, _, err = net.SplitHostPort(ipPort); err != nil {
return
}
if _, err = toml.Decode(fc, &ic); err != nil {
return
}
if ic.InsConf == nil {
err = fmt.Errorf("file(%s) cannot decode toml", fn)
return
}
if ic.InsConf.Addr != ipPort {
err = fmt.Errorf("file(%s) name not equal addr(%s)", fn, ic.InsConf.Addr)
return
}
if ic.InsConf.MasterInfo == nil {
ic.InsConf.MasterInfo = Conf.MasterInfo
}
return ic.InsConf, nil
}
func newHBaseConf(fn, fc string) (c *HBaseInsConf, err error) {
var ic struct {
InsConf *HBaseInsConf `toml:"instance"`
}
if _, err = toml.Decode(fc, &ic); err != nil {
return
}
if ic.InsConf == nil {
err = fmt.Errorf("file(%s) cannot decode toml", fn)
return
}
cluster := strings.TrimSuffix(fn, ".hbase.toml")
if ic.InsConf.Cluster != cluster {
err = fmt.Errorf("file(%s) name not equal name(%s)", cluster, ic.InsConf.Cluster)
return
}
if ic.InsConf.MasterInfo == nil {
ic.InsConf.MasterInfo = Conf.MasterInfo
}
return ic.InsConf, nil
}
// LoadCanalConf load canal config.
func LoadCanalConf() (c *CanalConfig, err error) {
var (
result []*conf.Value
ok bool
)
if canalPath != "" {
result, err = localCanal()
} else {
result, ok = ConfClient.Configs()
if !ok {
panic("no canal-config")
}
}
c = new(CanalConfig)
im := map[string]struct{}{}
for _, ns := range result {
if ns.Name == "canal.toml" || ns.Name == "common.toml" {
continue
}
if strings.HasSuffix(ns.Name, ".hbase.toml") {
var ic *HBaseInsConf
if ic, err = newHBaseConf(ns.Name, ns.Config); err != nil {
err = fmt.Errorf("file(%s) decode error(%v)", ns.Name, err)
return
}
c.HBaseInstances = append(c.HBaseInstances, ic)
} else if strings.HasSuffix(ns.Name, ".tidb.toml") {
var ic *TiDBInsConf
if ic, err = newTiDBConf(ns.Name, ns.Config); err != nil {
err = fmt.Errorf("file(%s) decode error(%v)", ns.Name, err)
return
}
c.TiDBInstances = append(c.TiDBInstances, ic)
} else {
var ic *InsConf
if !strings.HasSuffix(ns.Name, ".toml") {
err = fmt.Errorf("file(%s) name is not a toml", ns.Name)
continue
}
if ic, err = newInsConf(ns.Name, ns.Config); err != nil {
err = fmt.Errorf("file(%s) decode error(%v)", ns.Name, err)
return
}
if _, ok := im[ic.Addr]; ok {
err = fmt.Errorf("file(%s) repeat with other toml", ns.Name)
return
}
im[ic.Addr] = struct{}{}
c.Instances = append(c.Instances, ic)
}
}
if canalPath == "" {
go func() {
for name := range ConfClient.Event() {
log.Info("config(%s) reload", name)
reloadConfig(name)
}
}()
}
return
}
func localCanal() (vs []*conf.Value, ok error) {
fs, err := ioutil.ReadDir(canalPath)
if err != nil {
panic(err)
}
for _, f := range fs {
if !strings.HasSuffix(f.Name(), ".toml") {
continue
}
ct, err := ioutil.ReadFile(canalPath + f.Name())
if err != nil {
continue
}
vs = append(vs, &conf.Value{
Name: f.Name(),
Config: string(ct),
})
}
return
}
// Event returns config change event chan,
func Event() chan *InsConf {
return event
}
// HBaseEvent returns config change event chan,
func HBaseEvent() chan *HBaseInsConf {
return hbaseEvent
}
func reloadConfig(name string) {
var (
cf string
ok bool
)
if name == "canal.toml" || name == "common.toml" {
LoadCanal()
return
}
if !strings.HasSuffix(name, ".toml") {
return
}
if cf, ok = ConfClient.Value(name); !ok {
// TODO(felix): auto reload? or restart hard?
return
}
if strings.HasSuffix(name, ".hbase.toml") {
ic, err := newHBaseConf(name, cf)
if err != nil {
return
}
hbaseEvent <- ic
} else if strings.HasSuffix(name, ".tidb.toml") {
ic, err := newTiDBConf(name, cf)
if err != nil {
return
}
tidbEvent <- ic
} else {
ic, err := newInsConf(name, cf)
if err != nil {
return
}
event <- ic
}
}