GO语言——实现从MySQL数据库迁移到Amazon DynamoDB
目录
- GO语言——实现从MySQL数据库迁移到Amazon DynamoDB
- 一、前言
- 1.1 迁移背景
- 1.2 迁移问题
- 1.3 AWS官方迁移方法
- 二、思路与函数
- 2.1 思路
- 2.2 主函数
- 三、MySQL函数
- 3.1 查询所有非系统的数据库
- 3.2 查询数据表信息
- 3.3 查询数据表字段信息
- 3.4 查询表所有信息
- 四、Amazon DynamoDB
- 4.1 创建DynamoDB表
- 4.2 插入数据
- 附录:
- 参考
- 代码地址
一、前言
博主在学习Go语言,碰巧Leader要求研究一下能不能用脚本方式实现MySQL数据库到DynamoDB的迁移
这篇博文是博主使用Go语言实现的一种简单迁移,因为初学Go语言,代码不是很优美,也没有算法优化。
功能上, 只是简单的把数据一比一的迁移到DynamoDB。并且原MySQL数据库类型,在DynamoDB中都为String类型。
后续有时间会继续优化该程序,也欢迎Go语言的大佬们提出宝贵的修改和优化方案。
1.1 迁移背景
许多公司考虑从MySQL等关系数据库迁移到Amazon DynamoDB
Amazon DynamoDB是一项完全托管、快速、高度可扩展且灵活的NoSQL数据库。DynamoDB可以根据业务需求根据流量增加或减少容量。与典型的基于媒体的RDBMS相比,可以更轻松地优化服务的总成本
1.2 迁移问题
- 由于停机造成的服务中断,尤其是当客户服务必须24/7/365无缝可用时
- RDBMS和DynamoDB的不同键设计
1.3 AWS官方迁移方法
两种基于AWS托管服务的迁移:https://aws.amazon.com/cn/blogs/big-data/near-zero-downtime-migration-from-mysql-to-dynamodb/
学习视频:https://www.youtube.com/watch?v=j88icq7JArI
二、思路与函数
2.1 思路
-
初始数据库连接和DynamoDB client
-
读取MySQL数据库
-
获取每个数据库中MySQL数据表
-
将数据表结构转为DynamoDB结构(字段,类型)
- 获取表字段并判断表字段是否为主键
-
创建DynamoDB表:
-
定义DynamoDB表名称:mysql数据库名_数据表
-
创建DynamoDB表
-
-
循环获取MySQL数据表的数据,加载到DynamoDB
-
获取所有列数据信息,以及行数
-
读取数据表的数据
-
把数据写入DynamoDB
2.2 主函数
main
函数功能为调用其他函数
初始化数据库连接,和初始化Amazon DynamoDB客户端
// 初始化数据库连接
db := Mysql.ConnectDB()
if db == nil {
fmt.Printf("init db failed!\n")
return
}
// 初始DynamoDB client
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("ap-southeast-1"))
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
// Using the Config value, create the DynamoDB client
svc := dynamodb.NewFromConfig(cfg)
函数调用
// 1 读取数据库
Mysql.DatabaseInfo(db)
// 2 读取数据表
Mysql.TableInfo(db, database[i])
// 3 获取表字段
Mysql.TableFiledInfo(db, database[i], table[j])
// 4 创建DynamoDB表
DynamoDB.CreateDynamoDB(svc, field, tableName)
// 5.1 获取所有列数据信息,以及行数
Mysql.TableData(db, field, database[i], table[j])
// 5.3 把数据写入DynamoDB
DynamoDB.PutItemDynamoDB(svc , itemMap, tableName)
三、MySQL函数
3.1 查询所有非系统的数据库
在MySQL数据库中,INFORMATION_SCHEMA.TABLES
表存储了MySQL数据库的元数据。
元数据信息主要包括数据库中表信息以及表字段信息,可以从INFORMATION_SCHEMA.TABLES
表中查询数据库信息:
SELECT table_schema databaseName
FROM INFORMATION_SCHEMA.TABLES
WHERE UPPER(table_type)='BASE TABLE'
AND table_schema NOT IN ('mysql','performance_schema','sys')
GROUP BY table_schema
ORDER BY table_schema asc
UPPER(table_type)='BASE TABLE'
只选择基础数据库,在MySQL数据库中,人为创建的数据库都为BASE TABLE
类型。
在MySQL数据库中,数据库mysql,performance_schema,sys
都属于BASE TABLE
类型。但是这三个数据库也是MySQL自带的数据库,不是用户数据库,需要排除。
func DatabaseInfo(db *sql.DB) []string {
sqlStr := `SELECT table_schema databaseName
FROM INFORMATION_SCHEMA.TABLES
WHERE UPPER(table_type)='BASE TABLE'
AND table_schema NOT IN ('mysql','performance_schema','sys')
GROUP BY table_schema
ORDER BY table_schema asc`
rows, err := db.Query(sqlStr)
// 关闭查询
defer rows.Close()
if err != nil {
fmt.Printf("query table name error!err:%v\n", err)
return nil
//panic(err)
}
var result []string
for rows.Next() {
var tableName string
err = rows.Scan(&tableName)
if err != nil {
fmt.Printf("scan table name error!err:%v\n", err)
return nil
}
result = append(result, tableName)
}
return result
}
遍历查询结果,数据库名称以数组形式存储。
3.2 查询数据表信息
数据库中的所有表结构,从INFORMATION_SCHEMA.TABLES
表中查询。代码逻辑与查询数据库信息一致
sqlStr
查询语句,?
传入数据库名称
sqlStr := `SELECT table_name tableName
FROM INFORMATION_SCHEMA.TABLES
WHERE UPPER(table_type)='BASE TABLE'
AND LOWER(table_schema) = ?
GROUP BY table_name
ORDER BY table_name asc`
遍历查询结果,数据表名称以数组形式存储。
3.3 查询数据表字段信息
数据表中的所有字段结构,从INFORMATION_SCHEMA.TABLES
表中查询。代码逻辑与查询数据库信息一致
数据表字段结构体:
Fname
:表字段名ColumnKey
:表字段属性是否为主键(PRI)dataType
:表字段类型
type Field struct {
Fname string
ColumnKey string
dataType string
}
sqlStr
查询语句,?
传入数据库和数据表名称
sqlStr := `SELECT COLUMN_NAME fName,COLUMN_KEY columnKey,DATA_TYPE dataType
FROM information_schema.columns
WHERE table_schema = ? AND table_name = ?`
函数返回一个表字段结构体
3.4 查询表所有信息
在go语言原生的github.com/go-sql-driver/mysql
中的查询,需要指定与查询结果同样数量的变量才能把查询结果输出。
官方文档说明:https://pkg.go.dev/database/sql#Row.Scan
因此博主通过上个函数获取到的表字段,一个个字段查询,汇总为一个按列查询结构的map类型数据。
- 遍历字段数组,通过
Query
查询出,数据表中该列的值 - 利用
rows.Next
遍历查询结果,rows.Scan
获取列值,追加到数组中 - 使用map类型,以key(字段名):value[值数组]的方式存储一个表的所有数据
func TableData(db *sql.DB,field []Field,database, table string) (map[string][]string, int) {
result := make(map[string][]string)
var rowsLength int
for i := 0; i < len(field); i++ {
sqlStr := "SELECT " + field[i].Fname + " from " + database + "." + table
//fmt.Println(sqlStr)
rows, err := db.Query(sqlStr)
if err != nil {
fmt.Printf("Failed to query table! error: %v\n", err)
return nil, 0
}
defer rows.Close()
var columnValue string
var oneResult []string
for rows.Next() {
err = rows.Scan(&columnValue)
if err != nil {
fmt.Printf("Failed to scan a field in a table!err:%v\n", err)
return nil, 0
}
oneResult = append(oneResult, columnValue)
}
if len(oneResult) == 0 {
fmt.Printf("%v.%v not data!\n", database, table)
return nil, 0
}
result[field[i].Fname] = oneResult
rowsLength = len(oneResult)
}
return result, rowsLength
}
-
db: *sql.DB 数据库连接
-
field: []Field 数据表字段
-
database: string 数据库
-
table: string 数据表
-
return: map[string][]string 返回每一列的数据
四、Amazon DynamoDB
4.1 创建DynamoDB表
在DynamoDB的设计中,只有一个分区键和一个排序键。当然Amazon DynamoDB中,还可以添加全局索引和本地索引,这个方式复杂,在这里只是使用了分区键和排序键
因为DynamoDB只有两个键,并且必须指定一个分区键。
而在MySQL数据库中可能会存在两个以上或无主键的情况,面对这两种情况,博主通过判断前面获取的字段属性值。
如果存在两个主键,就以查询结果前面的两个主键分别作为分区键和排序键;若不存在主键,以查询结果第一个列为分区键。
在这里最好的方式应该是写成一个接口,按实际生产来修改每个数据表转换为DynamoDB后的格式
另外创建表,默认都是String类型,并没有判断原字段的格式。博主理解最好的方法应该是使用Go语言的反射机制来判断转换的DynamoDB字段类型
func CreateDynamoDB(svc *dynamodb.Client, field [] Mysql.Field, tableName string) *dynamodb.CreateTableOutput {
var attributeDefinitions []types.AttributeDefinition
var keySchema []types.KeySchemaElement
for i :=0; i < len(field); i++ {
if (field[i].ColumnKey == "PRI") && (len(attributeDefinitions) < 1) {
// 第一个主键作为分区键
Attribute := []types.AttributeDefinition{
{
AttributeName: aws.String(field[i].Fname),
AttributeType: types.ScalarAttributeTypeS,
},
}
schemaElement := []types.KeySchemaElement{
{
AttributeName: aws.String(field[i].Fname),
KeyType: types.KeyTypeHash,
},
}
attributeDefinitions = append(attributeDefinitions, Attribute...)
keySchema = append(keySchema, schemaElement...)
} else if (field[i].ColumnKey == "PRI") && (len(attributeDefinitions) >= 1) {
// 第二个主键作为排序键
Attribute := []types.AttributeDefinition{
{
AttributeName: aws.String(field[i].Fname),
AttributeType: types.ScalarAttributeTypeS,
},
}
schemaElement := []types.KeySchemaElement{
{
AttributeName: aws.String(field[i].Fname),
KeyType: types.KeyTypeRange,
},
}
attributeDefinitions = append(attributeDefinitions, Attribute...)
keySchema = append(keySchema, schemaElement...)
}
// 当存在多于两个主键时,只选择前两个主键
if len(attributeDefinitions) >= 2 {
fmt.Printf("The database primary key is greater than or equal to 2!tableName:%v\n", tableName)
break
}
}
// 如果不存在主键,以第一个表字段为DynamoDB的分区键
if len(attributeDefinitions) == 0 {
attributeDefinitions = []types.AttributeDefinition{
{
AttributeName: aws.String(field[0].Fname),
AttributeType: types.ScalarAttributeTypeS,
},
}
keySchema = []types.KeySchemaElement{
{
AttributeName: aws.String(field[0].Fname),
KeyType: types.KeyTypeHash,
},
}
fmt.Printf("No primary key exists in the database!tableName:%v\n", tableName)
}
//fmt.Println(attributeDefinitions[1].AttributeName)
input := &dynamodb.CreateTableInput{
AttributeDefinitions: attributeDefinitions,
KeySchema: keySchema,
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(5),
WriteCapacityUnits: aws.Int64(5),
},
TableName: aws.String(tableName),
}
result, err := svc.CreateTable(context.TODO(),input)
if err != nil {
fmt.Printf("Failed to create DynamoDB! error: %v\n", err)
return nil
}
// CreateTable为异步操作,需要等待一定时间,继续下一步
time.Sleep(time.Second * 5)
return result
}
4.2 插入数据
通过遍历获取到MySQL数据表的所有数据,将数据添加到符合DynamoDB格式的map中,调用PutItemInput
接口添加数据
添加的数据类型都是String类型,并没有判断原字段的格式。博主理解最好的方法应该是使用Go语言的反射机制来判断转换的DynamoDB字段类型
for k := 0; k < rowLength; k++ {
itemMap := make(map[string]types.AttributeValue)
for itemName, item := range tableData {
itemMap[itemName] = &types.AttributeValueMemberS{Value: item[k]}
}
// 5.3 把数据写入DynamoDB
putItemReuslt := DynamoDB.PutItemDynamoDB(svc , itemMap, tableName)
if putItemReuslt != nil {
fmt.Println("put Item succeed!")
} else {
panic(putItemReuslt)
}
}
func PutItemDynamoDB(svc *dynamodb.Client, itemMap map[string]types.AttributeValue, tableName string) *dynamodb.PutItemOutput{
input := &dynamodb.PutItemInput{
Item: itemMap,
ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal,
TableName: aws.String(tableName),
}
result, err := svc.PutItem(context.TODO(),input)
if err != nil {
fmt.Printf("Failed to put Item! error: %v\n", err)
return nil
}
return result
}
附录:
参考
MySQL Driver:https://github.com/Go-SQL-Driver/MySQL/
AWS Go DynamoDB SDKv2:https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/dynamodb#Client.PutItem
代码地址
GitHub:https://github.com/MoGD2018/Go-mysql-convert-to-dynamodb
Gitee:https://gitee.com/MoGD/Go-mysql-convert-to-dynamodb