// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package infoschema

import (
	"strings"
	"sync/atomic"

	"github.com/juju/errors"
	"github.com/pingcap/tidb/kv"
	"github.com/pingcap/tidb/meta"
	"github.com/pingcap/tidb/meta/autoid"
	"github.com/pingcap/tidb/model"
	"github.com/pingcap/tidb/mysql"
	"github.com/pingcap/tidb/perfschema"
	"github.com/pingcap/tidb/table"
	"github.com/pingcap/tidb/terror"
	// import table implementation to init table.TableFromMeta
	_ "github.com/pingcap/tidb/table/tables"
	"github.com/pingcap/tidb/util/types"
)

// InfoSchema is the interface used to retrieve the schema information.
// It works as a in memory cache and doesn't handle any schema change.
// InfoSchema is read-only, and the returned value is a copy.
// TODO: add more methods to retrieve tables and columns.
type InfoSchema interface {
	SchemaByName(schema model.CIStr) (*model.DBInfo, bool)
	SchemaExists(schema model.CIStr) bool
	TableByName(schema, table model.CIStr) (table.Table, error)
	TableExists(schema, table model.CIStr) bool
	ColumnByName(schema, table, column model.CIStr) (*model.ColumnInfo, bool)
	ColumnExists(schema, table, column model.CIStr) bool
	IndexByName(schema, table, index model.CIStr) (*model.IndexInfo, bool)
	SchemaByID(id int64) (*model.DBInfo, bool)
	TableByID(id int64) (table.Table, bool)
	AllocByID(id int64) (autoid.Allocator, bool)
	ColumnByID(id int64) (*model.ColumnInfo, bool)
	ColumnIndicesByID(id int64) ([]*model.IndexInfo, bool)
	AllSchemaNames() []string
	AllSchemas() []*model.DBInfo
	Clone() (result []*model.DBInfo)
	SchemaTables(schema model.CIStr) []table.Table
	SchemaMetaVersion() int64
}

// Infomation Schema Name.
const (
	Name = "INFORMATION_SCHEMA"
)

type infoSchema struct {
	schemaNameToID  map[string]int64
	tableNameToID   map[tableName]int64
	columnNameToID  map[columnName]int64
	schemas         map[int64]*model.DBInfo
	tables          map[int64]table.Table
	tableAllocators map[int64]autoid.Allocator
	columns         map[int64]*model.ColumnInfo
	indices         map[indexName]*model.IndexInfo
	columnIndices   map[int64][]*model.IndexInfo

	// We should check version when change schema.
	schemaMetaVersion int64
}

var _ InfoSchema = (*infoSchema)(nil)

type tableName struct {
	schema string
	table  string
}

type columnName struct {
	tableName
	name string
}

type indexName struct {
	tableName
	name string
}

func (is *infoSchema) SchemaByName(schema model.CIStr) (val *model.DBInfo, ok bool) {
	id, ok := is.schemaNameToID[schema.L]
	if !ok {
		return
	}
	val, ok = is.schemas[id]
	return
}

func (is *infoSchema) SchemaMetaVersion() int64 {
	return is.schemaMetaVersion
}

func (is *infoSchema) SchemaExists(schema model.CIStr) bool {
	_, ok := is.schemaNameToID[schema.L]
	return ok
}

func (is *infoSchema) TableByName(schema, table model.CIStr) (t table.Table, err error) {
	id, ok := is.tableNameToID[tableName{schema: schema.L, table: table.L}]
	if !ok {
		return nil, TableNotExists.Gen("table %s.%s does not exist", schema, table)
	}
	t = is.tables[id]
	return
}

func (is *infoSchema) TableExists(schema, table model.CIStr) bool {
	_, ok := is.tableNameToID[tableName{schema: schema.L, table: table.L}]
	return ok
}

func (is *infoSchema) ColumnByName(schema, table, column model.CIStr) (val *model.ColumnInfo, ok bool) {
	id, ok := is.columnNameToID[columnName{tableName: tableName{schema: schema.L, table: table.L}, name: column.L}]
	if !ok {
		return
	}
	val, ok = is.columns[id]
	return
}

func (is *infoSchema) ColumnExists(schema, table, column model.CIStr) bool {
	_, ok := is.columnNameToID[columnName{tableName: tableName{schema: schema.L, table: table.L}, name: column.L}]
	return ok
}

func (is *infoSchema) IndexByName(schema, table, index model.CIStr) (val *model.IndexInfo, ok bool) {
	val, ok = is.indices[indexName{tableName: tableName{schema: schema.L, table: table.L}, name: index.L}]
	return
}

func (is *infoSchema) SchemaByID(id int64) (val *model.DBInfo, ok bool) {
	val, ok = is.schemas[id]
	return
}

func (is *infoSchema) TableByID(id int64) (val table.Table, ok bool) {
	val, ok = is.tables[id]
	return
}

func (is *infoSchema) AllocByID(id int64) (val autoid.Allocator, ok bool) {
	val, ok = is.tableAllocators[id]
	return
}

func (is *infoSchema) ColumnByID(id int64) (val *model.ColumnInfo, ok bool) {
	val, ok = is.columns[id]
	return
}

func (is *infoSchema) ColumnIndicesByID(id int64) (indices []*model.IndexInfo, ok bool) {
	indices, ok = is.columnIndices[id]
	return
}

func (is *infoSchema) AllSchemaNames() (names []string) {
	for _, v := range is.schemas {
		names = append(names, v.Name.O)
	}
	return
}

func (is *infoSchema) AllSchemas() (schemas []*model.DBInfo) {
	for _, v := range is.schemas {
		schemas = append(schemas, v)
	}
	return
}

func (is *infoSchema) SchemaTables(schema model.CIStr) (tables []table.Table) {
	di, ok := is.SchemaByName(schema)
	if !ok {
		return
	}
	for _, ti := range di.Tables {
		tables = append(tables, is.tables[ti.ID])
	}
	return
}

func (is *infoSchema) Clone() (result []*model.DBInfo) {
	for _, v := range is.schemas {
		result = append(result, v.Clone())
	}
	return
}

// Handle handles information schema, including getting and setting.
type Handle struct {
	value atomic.Value
	store kv.Storage
}

// NewHandle creates a new Handle.
func NewHandle(store kv.Storage) *Handle {
	h := &Handle{
		store: store,
	}
	// init memory tables
	initMemoryTables(store)
	initPerfSchema(store)
	return h
}

func initPerfSchema(store kv.Storage) {
	perfHandle = perfschema.NewPerfHandle(store)
}

func genGlobalID(store kv.Storage) (int64, error) {
	var globalID int64
	err := kv.RunInNewTxn(store, true, func(txn kv.Transaction) error {
		var err error
		globalID, err = meta.NewMeta(txn).GenGlobalID()
		return errors.Trace(err)
	})
	return globalID, errors.Trace(err)
}

var (
	// Information_Schema
	isDB          *model.DBInfo
	schemataTbl   table.Table
	tablesTbl     table.Table
	columnsTbl    table.Table
	statisticsTbl table.Table
	charsetTbl    table.Table
	collationsTbl table.Table
	filesTbl      table.Table
	defTbl        table.Table
	profilingTbl  table.Table
	nameToTable   map[string]table.Table

	perfHandle perfschema.PerfSchema
)

func setColumnID(meta *model.TableInfo, store kv.Storage) error {
	var err error
	for _, c := range meta.Columns {
		c.ID, err = genGlobalID(store)
		if err != nil {
			return errors.Trace(err)
		}
	}
	return nil
}

func initMemoryTables(store kv.Storage) error {
	// Init Information_Schema
	var (
		err error
		tbl table.Table
	)
	dbID, err := genGlobalID(store)
	if err != nil {
		return errors.Trace(err)
	}
	nameToTable = make(map[string]table.Table)
	isTables := make([]*model.TableInfo, 0, len(tableNameToColumns))
	for name, cols := range tableNameToColumns {
		meta := buildTableMeta(name, cols)
		isTables = append(isTables, meta)
		meta.ID, err = genGlobalID(store)
		if err != nil {
			return errors.Trace(err)
		}
		err = setColumnID(meta, store)
		if err != nil {
			return errors.Trace(err)
		}
		alloc := autoid.NewMemoryAllocator(dbID)
		tbl, err = createMemoryTable(meta, alloc)
		if err != nil {
			return errors.Trace(err)
		}
		nameToTable[meta.Name.L] = tbl
	}
	schemataTbl = nameToTable[strings.ToLower(tableSchemata)]
	tablesTbl = nameToTable[strings.ToLower(tableTables)]
	columnsTbl = nameToTable[strings.ToLower(tableColumns)]
	statisticsTbl = nameToTable[strings.ToLower(tableStatistics)]
	charsetTbl = nameToTable[strings.ToLower(tableCharacterSets)]
	collationsTbl = nameToTable[strings.ToLower(tableCollations)]

	// CharacterSets/Collations contain static data. Init them now.
	err = insertData(charsetTbl, dataForCharacterSets())
	if err != nil {
		return errors.Trace(err)
	}
	err = insertData(collationsTbl, dataForColltions())
	if err != nil {
		return errors.Trace(err)
	}
	// create db
	isDB = &model.DBInfo{
		ID:      dbID,
		Name:    model.NewCIStr(Name),
		Charset: mysql.DefaultCharset,
		Collate: mysql.DefaultCollationName,
		Tables:  isTables,
	}
	return nil
}

func insertData(tbl table.Table, rows [][]types.Datum) error {
	for _, r := range rows {
		_, err := tbl.AddRecord(nil, r)
		if err != nil {
			return errors.Trace(err)
		}
	}
	return nil
}

func refillTable(tbl table.Table, rows [][]types.Datum) error {
	err := tbl.Truncate(nil)
	if err != nil {
		return errors.Trace(err)
	}
	return insertData(tbl, rows)
}

// Set sets DBInfo to information schema.
func (h *Handle) Set(newInfo []*model.DBInfo, schemaMetaVersion int64) error {
	info := &infoSchema{
		schemaNameToID:    map[string]int64{},
		tableNameToID:     map[tableName]int64{},
		columnNameToID:    map[columnName]int64{},
		schemas:           map[int64]*model.DBInfo{},
		tables:            map[int64]table.Table{},
		tableAllocators:   map[int64]autoid.Allocator{},
		columns:           map[int64]*model.ColumnInfo{},
		indices:           map[indexName]*model.IndexInfo{},
		columnIndices:     map[int64][]*model.IndexInfo{},
		schemaMetaVersion: schemaMetaVersion,
	}
	var err error
	var hasOldInfo bool
	infoschema := h.Get()
	if infoschema != nil {
		hasOldInfo = true
	}
	for _, di := range newInfo {
		info.schemas[di.ID] = di
		info.schemaNameToID[di.Name.L] = di.ID
		for _, t := range di.Tables {
			alloc := autoid.NewAllocator(h.store, di.ID)
			if hasOldInfo {
				val, ok := infoschema.AllocByID(t.ID)
				if ok {
					alloc = val
				}
			}
			info.tableAllocators[t.ID] = alloc
			info.tables[t.ID], err = table.TableFromMeta(alloc, t)
			if err != nil {
				return errors.Trace(err)
			}
			tname := tableName{di.Name.L, t.Name.L}
			info.tableNameToID[tname] = t.ID
			for _, c := range t.Columns {
				info.columns[c.ID] = c
				info.columnNameToID[columnName{tname, c.Name.L}] = c.ID
			}
			for _, idx := range t.Indices {
				info.indices[indexName{tname, idx.Name.L}] = idx
				for _, idxCol := range idx.Columns {
					columnID := t.Columns[idxCol.Offset].ID
					columnIndices := info.columnIndices[columnID]
					info.columnIndices[columnID] = append(columnIndices, idx)
				}
			}
		}
	}
	// Build Information_Schema
	info.schemaNameToID[isDB.Name.L] = isDB.ID
	info.schemas[isDB.ID] = isDB
	for _, t := range isDB.Tables {
		tbl, ok := nameToTable[t.Name.L]
		if !ok {
			return errors.Errorf("table `%s` is missing.", t.Name)
		}
		info.tables[t.ID] = tbl
		tname := tableName{isDB.Name.L, t.Name.L}
		info.tableNameToID[tname] = t.ID
		for _, c := range t.Columns {
			info.columns[c.ID] = c
			info.columnNameToID[columnName{tname, c.Name.L}] = c.ID
		}
	}

	// Add Performance_Schema
	psDB := perfHandle.GetDBMeta()
	info.schemaNameToID[psDB.Name.L] = psDB.ID
	info.schemas[psDB.ID] = psDB
	for _, t := range psDB.Tables {
		tbl, ok := perfHandle.GetTable(t.Name.O)
		if !ok {
			return errors.Errorf("table `%s` is missing.", t.Name)
		}
		info.tables[t.ID] = tbl
		tname := tableName{psDB.Name.L, t.Name.L}
		info.tableNameToID[tname] = t.ID
		for _, c := range t.Columns {
			info.columns[c.ID] = c
			info.columnNameToID[columnName{tname, c.Name.L}] = c.ID
		}
	}
	// Should refill some tables in Information_Schema.
	// schemata/tables/columns/statistics
	dbNames := make([]string, 0, len(info.schemas))
	dbInfos := make([]*model.DBInfo, 0, len(info.schemas))
	for _, v := range info.schemas {
		dbNames = append(dbNames, v.Name.L)
		dbInfos = append(dbInfos, v)
	}
	err = refillTable(schemataTbl, dataForSchemata(dbNames))
	if err != nil {
		return errors.Trace(err)
	}
	err = refillTable(tablesTbl, dataForTables(dbInfos))
	if err != nil {
		return errors.Trace(err)
	}
	err = refillTable(columnsTbl, dataForColumns(dbInfos))
	if err != nil {
		return errors.Trace(err)
	}
	err = refillTable(statisticsTbl, dataForStatistics(dbInfos))
	if err != nil {
		return errors.Trace(err)
	}
	h.value.Store(info)
	return nil
}

// Get gets information schema from Handle.
func (h *Handle) Get() InfoSchema {
	v := h.value.Load()
	schema, _ := v.(InfoSchema)
	return schema
}

// Schema error codes.
const (
	CodeDbDropExists      terror.ErrCode = 1008
	CodeDatabaseNotExists                = 1049
	CodeTableNotExists                   = 1146
	CodeColumnNotExists                  = 1054

	CodeDatabaseExists = 1007
	CodeTableExists    = 1050
	CodeBadTable       = 1051
)

var (
	// DatabaseDropExists returns for drop an unexist database.
	DatabaseDropExists = terror.ClassSchema.New(CodeDbDropExists, "database doesn't exist")
	// DatabaseNotExists returns for database not exists.
	DatabaseNotExists = terror.ClassSchema.New(CodeDatabaseNotExists, "database not exists")
	// TableNotExists returns for table not exists.
	TableNotExists = terror.ClassSchema.New(CodeTableNotExists, "table not exists")
	// ColumnNotExists returns for column not exists.
	ColumnNotExists = terror.ClassSchema.New(CodeColumnNotExists, "field not exists")

	// DatabaseExists returns for database already exists.
	DatabaseExists = terror.ClassSchema.New(CodeDatabaseExists, "database already exists")
	// TableExists returns for table already exists.
	TableExists = terror.ClassSchema.New(CodeTableExists, "table already exists")
	// TableDropExists returns for drop an unexist table.
	TableDropExists = terror.ClassSchema.New(CodeBadTable, "unknown table")
)

func init() {
	schemaMySQLErrCodes := map[terror.ErrCode]uint16{
		CodeDbDropExists:      mysql.ErrDbDropExists,
		CodeDatabaseNotExists: mysql.ErrBadDb,
		CodeTableNotExists:    mysql.ErrNoSuchTable,
		CodeColumnNotExists:   mysql.ErrBadField,
		CodeDatabaseExists:    mysql.ErrDbCreateExists,
		CodeTableExists:       mysql.ErrTableExists,
		CodeBadTable:          mysql.ErrBadTable,
	}
	terror.ErrClassToMySQLCodes[terror.ClassSchema] = schemaMySQLErrCodes
}