mysql基于binlog回滚工具_flashback(python版本)


 

    update、delete的条件写错甚至没有写,导致数据操作错误,需要恢复被误操作的行记录。这种情形,其实时有发生,可以选择用备份文件+binlog来恢复到测试环境,然后再做数据修复,但是这样其实需要耗费一定的时间跟资源。

    其实,如果binlog format为row,binlog文件中是会详细记录每一个事务涉及到操作,并把每一个事务影响到行记录均存储起来,能否给予binlog 文件来反解析数据库的行记录变动情况呢?
    业界已有不少相关的脚本及工具,但是随着MySQL版本的更新、binlog记录内容的变化以及需求不一致,大多数脚本不太适合个人目前的使用需求,所以开始着手编写 mysql的 flash back脚本。
 


 
    如果转载,请注明博文来源: www.cnblogs.com/xinysu/   ,版权归 博客园 苏家小萝卜 所有。望各位支持!
 


 
     仅在MySQL 5.6/5.7版本测试,python运行环境需要安装pymysql模块。

1 实现内容

    根据binlog文件,对某个\些事务、某段时间某些表、某段时间全库 做回滚操作,实现闪回功能。工具处理过程中,会把binlog中的事务修改的行记录存储到表格中去,通过 dml_sql 列,可以查看每一个事务内部的所有行记录变更情况,通过 undo_sql 查看回滚的SQL内容。如下图,然后再根据表格内容做回滚操作。 
      
    那么这个脚本有哪些优点呢?
  • 回滚分为2个命令:第一个命令 分析binglog并存储进入数据库;第二个命令 执行回滚操作;
  • 回滚的时候,可以把执行脚本跟回滚脚本统一存放到数据库中,可以查看 更新内容以及回滚内容;
  • 根据存储的分析表格,方便指定事务或者指定表格来来恢复;
  • 详细的日志输出,说明分析进度跟执行进度。
    分析binlog的输出截图(分析1G的binlog文件)
   
 
   回滚数据库的输出截图:
   

2 原理

    前提:实例启动了binlog并且格式为ROW。
    
    使用python对mysqlbinlog后的log文件进行文本分析处理,在整个处理过程中,有以下6个疑难点需要处理:
  1. 判断一个事务的开始跟结束
  2. 同一个事务的执行顺序需要反序执行
  3. 解析回滚SQL
  4. 同一个事务操作不同表格处理
  5. 转义字符处理,比如换行符、tab符等等
  6. timestamp数据类型参数值转换
  7. 负数处理
  8. 单个事务涉及到行修改SQL操作了 max_allow
  9. 针对某个表格做回滚,而不是全库回滚
    

2.1 事务的开始与结束

    按照Xid出现的位置来判断,从binlog文件的最开始开始读取,遇到SQL语句则提取出来,直到遇到Xid,统一把之前提取出来的SQL汇总为一个事务,然后继续提取SQL语句,直到遇到下一个Xid,再把这个事务的SQL汇总成一个事务,一直这样循环,直至文件顺序遍历结束。
   

 

2.2 事务内部反序处理

    同一个事务中,如果有多个表格多行记录发生变更,在回滚的时候,应该反序回滚SQL,那么,如何将提取出来的SQL反序存储呢?思路如下:
  • 每行记录的修改SQL独立出来
  • 将独立出来的SQL反序存储
   假设正序的事务SQL语句存储在变量 dml_sql 中,反序后的可以回滚的SQL存储在变量 undo_sql中。按顺序把行记录修改的SQL抽取出来 存储到变量 record_sql 中去,然后 赋值 undo_sql =record_sql + undo_sql ,再置空 record_sql 变量,如此,便可实现反序事务内部的执行SQL。

2.3 解析回滚SQL

    首先,查看binlog的日志内容,发现行修改的SQL情形如下,提取过程中需要注意这几个问题:
  • 行记录的列名配对,binlog file存储的列序号,不能直接使用
  • WHERE部分 跟 SET部分 之间并无关键字或者符号,需要添加 AND 或者 逗号
  • DELETE SQL 需要反转为 INSERT
  • UPDATE SQL 需要把WHERE 跟 SET的部分进行替换
  • INSERT SQL需要反转为 DELETE

2.4 同事务不同表格处理

    同一个事务中,允许对不同表格进行数据修改,这点在列名替换列序号的时候,需要留意处理。
    每一个的行记录前有一行记录,含有 'Table_map' 标识,会说明这一行当行记录是修改哪个表格,可以根据这个提示,来替换binlog里边的列序号为列名。

2.5 转义字符处理

    binlog文件在对非空格的空白字符处理,采用转义字符字符串存储,比如,在表格insert一列记录含换行符,而实际上在binlog文件中,是使用了 \x0a 替换了 换行操作,所以在回滚数据的过程中,需要对转义字符做处理。
   

   

 

    
    这里注意一个地方,039的转义字符是没有在函数 esc_code 中统一处理,而是单独做另外处理。
   

 

    转移字符表相见下图:
   

 

2.6 timestamp数据类型处理

    timestamp实际在数据库中的存储值是 INT类型,需要使用 函数 from_unixtime转换。
    建立测试表格tbtest,只有一列timestamp的列,存储值后查看binlog的内容,具体截图如下:
   

 

    在处理行记录的时候,要对timestamp的value做处理,添加from_unixtime函数转换。

2.7 负数值处理

    这个一开始写代码的时候,并没有考虑到。大量测试的过程中发现,所有整型的数据类型,在存储负数的时候,都会存入一个最大范围值。binlog在处理这块的机制有些不是很了解。测试如下:
   

 

   所以当遇到INT的各种数据类型并且VALUE为负数的时候,需要把 这个范围值去除,才能执行执行undo_sql。

2.8 单个事务行记录总SQL超过max_allowed_package处理

    分析binlog后存储两种sql类型,一种是行记录的修改SQL,即 dml_sql;一种是 行记录的回滚sql,即 undo_sql。从代码可知,存储这两个sql的列是longtext,最大可存储4G的内容。但是 MySQL中单个会话的包大小是有限制的,限制的参数为 max_allowed_packet,默认大小为 4Mb,最大为1G,所以这个脚本使用前,请手动设置 存储binlog file的数据库实例以及线上的数据库实例这个参数:
set global max_allowed_packet = 1073741824; #记得后续修改回来
    
    万一操作了呢?那么回滚只能分段来回滚,先回滚到 这个大事务,然后单独执行这个大事务,紧接着继续回滚,这部分不能使用pymysql嗲用source 文件执行,所以只能手动做这个操作。 求高能人士修改这个逻辑代码!!!

2.9 针对性回滚

    假设误操作的没有明确的时间点,只有一个区间,而这个区间还有其他的表格操作,那么这个时候,需要在分析binlog文件的时候,添加--database选项,先帅选到同一个数据库中binlog文件中。
    这里的处理是将这段区间的dml_sql跟undo_sql都存储到数据库表格中,然后再删除不需要回滚的事务,剩余需要回滚的事务。再执行回滚操作。

3 使用说明

3.1 参数说明

    这个脚本的参数稍微多些,可以 --help 查看具体说明。
   

 

    本人喜欢用各种颜色来分类参数(blingbling五颜六色,看着多有趣多精神),所以,按颜色来说明这些参数。
  • 黄色区域:这6个参数,提供的是 分析并存储binlog file的相关值,说明存储分析结果的数据库的链接方式、binlog文件的位置以及存储结果的表格名字;
  • 蓝色区域:这4个参数,提供 与线上数据库表结构一致的DB实例连接方式,仅需跟线上一模一样的表结构,不一定需要是主从库;
  • 绿色区域:最最重要的选项 -a,0代表仅分析binlog文件,1代表仅执行回滚操作,必须先执行0才可以执行1;
  • 紫色区域:举例说明。

3.2 应用场景说明

  • 全库回滚某段时间
    • 需要回滚某个时间段的所有SQL操作,回滚到某一个时间点
    • 这种情况下呢,大多数是使用备份文件+binlog解决
    • 但是这个脚本也可以满足,但请勿直接在线上操作,先 -a=0,看下分析结果,是否符合,符合的话,停掉某个从库,再在从库上执行,最后开发业务接入检查是否恢复到指定时间点,数据是否正常。
  • 某段时间某些表格回滚某些操作
    • 比如,开发提交了一个批量更新脚本,各个测试层面验证没有问题,提交线上执行,但是执行后,发现有个业务漏测试,导致某些字段更新后影响到其他业务,现在需要紧急 把被批量更新的表格回滚到原先的行记录
    • 这个并不能单纯从技术角度来处理,要综合考虑
      •  

      • 这种情况下,如何回顾tab A表格的修改操作呢?
      • 个人觉得,这种方式比较行得通,dump tabA表格的数据到测试环境,然后再分析 binlog file 从11点-12点的undo sql,接着在测试环境回滚该表格到11点这个时刻,紧接着,由开发跟业务对比测试环境11点的数据跟线上现有的数据中,看下是哪些行哪些列需要在线上进行回滚,哪些是不需要的,然后开发再提交SQL脚本,再在线上执行。其实,这里边,DBA仅提供一个角色,就是把 表格 tab A 在一个新的环境上,回滚到某个时间点,但是不提供直接线上回滚SQL的处理。
  • 回滚某个/些SQL
    • 这种情况比较常见,某个update某个delete缺少where条件或者where条件执行错误
    • 这种情况下,找到对应的事务,执行回滚即可,回滚流程请参考上面一说,对的,我就是这么胆小怕事 

       

3.3 测试案例

3.3.1 全库回滚某段时间

假设需要回滚9点10分到9点15分间数据库的所有操作:
  • 准备测试环境实例存储分析后的数据 
  • 测试环境修改set global max_allowed_packet = 1073741824
  • mysqlbinlog分析binlog文件
  • python脚本分析文件,action=0
  • 线上测试环境修改set global max_allowed_packet = 1073741824
  • 回滚数据,action=1
  • 线上测试环境修改set global max_allowed_packet = 4194304
 1 --测试环境(请安装pymysql):IP: 192.168.9.242,PORT:3310 ,数据库:flashback,表格:tbevent 2 --具有线上表结构的db:IP:192.168.9.243 PORT:3310 3  4  5 mysql> show global variables like 'max_allowed_packet'; 6 +--------------------+----------+ 7 | Variable_name      | Value    | 8 +--------------------+----------+ 9 | max_allowed_packet | 16777216 |10 +--------------------+----------+11 1 row in set (0.00 sec)12 13 mysql> set global max_allowed_packet = 1073741824;14 Query OK, 0 rows affected (0.00 sec)15 16 [root@sutest244 ~]# mysqlbinlog --start-datetime='2017-06-19 09:00:00' --stop-datetime='2017-06-19 10:00:00' --base64-output=decode-rows -v ~/data/mysql/data/mysql-bin.007335 > /tmp/binlog.log17 18 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=019 2017-06-19 10:59:39,041 INFO begin to assign values to parameters20 2017-06-19 10:59:39,041 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent21 2017-06-19 10:59:39,049 INFO MySQL which userd to store binlog event connection is ok22 2017-06-19 10:59:39,050 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331023 2017-06-19 10:59:39,054 INFO MySQL which userd to analyse online table schema connection is ok24 2017-06-19 10:59:39,054 INFO MySQL connection is ok25 2017-06-19 10:59:39,055 INFO creating table flashback.tbevent to store binlog event26 2017-06-19 10:59:39,058 INFO created table flashback.tbevent 27 2017-06-19 10:59:39,060 INFO begining to analyze the binlog file ,this may be take a long time !!!28 2017-06-19 10:59:39,061 INFO analyzing...29 2017-06-19 11:49:53,781 INFO finished to analyze the binlog file !!!30 2017-06-19 11:49:53,782 INFO release all db connections31 2017-06-19 11:49:53,782 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310 32 33 34 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=135 2017-06-19 16:30:20,633 INFO begin to assign values to parameters36 2017-06-19 16:30:20,635 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent37 2017-06-19 16:30:20,865 INFO MySQL which userd to store binlog event connection is ok38 2017-06-19 16:30:20,866 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331039 2017-06-19 16:30:20,871 INFO MySQL which userd to analyse online table schema connection is ok40 2017-06-19 16:30:20,871 INFO MySQL connection is ok41 2017-06-19 16:30:21,243 INFO There has 347868 transactions ,need 35 batchs ,each batche doing 10000 transactions 42 2017-06-19 16:30:21,243 INFO doing batch : 1 43 2017-06-19 16:31:01,182 INFO doing batch : 2 44 2017-06-19 16:31:16,909 INFO doing batch : 3 45 -------省空间忽略不截图--------------46 2017-06-19 16:41:11,287 INFO doing batch : 34 47 2017-06-19 16:41:25,577 INFO doing batch : 35 48 2017-06-19 16:41:44,629 INFO release all db connections49 2017-06-19 16:41:44,630 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310

3.3.2 某段时间某些表格回滚某些操作

  • 准备测试环境实例存储分析后的数据 
  • 测试环境修改set global max_allowed_packet = 1073741824
  • mysqlbinlog分析binlog文件
  • python脚本分析文件,action=0
  • 分析帅选需要的事务,rename表格
  • dump 对应的表格到测试环境
  • 回滚数据,action=1
  • 提交给开发业务对比数据

3.3.3 回滚某个/些SQL

  • 准备测试环境实例存储分析后的数据 
  • 测试环境修改set global max_allowed_packet = 1073741824
  • mysqlbinlog分析binlog文件
  • python脚本分析文件,action=0
  • 分析帅选需要的事务,rename表格
  • dump 对应的表格到测试环境
  • 回滚数据,action=1
  • 提交给开发业务对比数据

4 python脚本

     脚本会不定期修复bug,若是感兴趣,可以往github下载:https://github.com/xinysu/mysql.git 中的 mysql_xinysu_flashback 。

  1 # -*- coding: utf-8 -*-  2 __author__ = 'xinysu'  3 __date__ = '2017/6/15 10:30'  4   5   6   7 import re  8 import os  9 import sys 10 import datetime 11 import time 12 import logging 13 import importlib 14 importlib.reload(logging) 15 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ') 16  17 import pymysql 18 from pymysql.cursors import DictCursor 19  20 usage='''\nusage: python [script's path] [option] 21 ALL options need to assign: 22 \033[1;33;40m 23 -h    : host, the database host,which database will store the results after analysis 24 -u    : user, the db user 25 -p    : password, the db user's password 26 -P    : port, the db port 27  28 -f    : file path, the binlog file 29 -t    : table name, the table name to store the results after analysis , {dbname}.{tbname}, 30         when you want to store in `test` db and the table name is `tbevent`,then this parameter  31         is test.tbevent 32 \033[1;34;40m 33 -oh   : online host, the database host,which database have the online table schema 34 -ou   : online user, the db user 35 -op   : online password, the db user's password 36 -oP   : online port, the db port 37 \033[1;32;40m 38 -a    : action,  39         0 just analyse the binlog file ,and store sql in table;  40         1 after execute self.dotype=0, execute the undo_sql in the table 41 \033[0m   42 --help: help document 43 \033[1;35;40m 44 Example: 45 analysize binlog: 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent  47                        -oh=192.168.9.244 -oP=3310 -u=root -op=***  48                        -a=0 49  50 flash back: 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent  52                        -oh=192.168.9.244 -oP=3310 -u=root -op=***  53                        -a=1 54 \033[0m                         55 ''' 56  57 class flashback: 58 def __init__(self): 59         self.host='' 60         self.user='' 61         self.password='' 62         self.port='3306' 63         self.fpath='' 64         self.tbevent='' 65  66         self.on_host='' 67         self.on_user='' 68         self.on_password='' 69         self.on_port='3306' 70  71         self.action=0 # 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table 72  73         self._get_db() # 从输入参数获取连接数据库的相关参数值 74  75 # 连接数据库,该数据库是用来存储binlog文件分析后的内容 76         logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent)) 77         self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8') 78         self.cur = self.mysqlconn.cursor(cursor=DictCursor) 79         logging.info('MySQL which userd to store binlog event connection is ok') 80  81 # 连接数据库,该数据库的表结构必须跟binlogfile基于对数据库表结构一致 82 # 该数据库用于提供 binlog file 文件中涉及到表结构分析 83         logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port)) 84         self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8') 85         self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor) 86         logging.info('MySQL which userd to analyse online table schema connection is ok') 87  88         logging.info('\033[33mMySQL connection is ok\033[0m') 89  90         self.dml_sql='' 91         self.undo_sql='' 92  93         self.tbfield_where = [] 94         self.tbfield_set = [] 95  96         self.begin_time='' 97         self.db_name='' 98         self.tb_name='' 99         self.end_time=''100         self.end_pos=''101         self.sqltype=0102 103 #_get_db用于获取执行命令的输入参数104 def _get_db(self):105         logging.info('begin to assign values to parameters')106 if len(sys.argv) == 1:107 print(usage)108             sys.exit(1)109 elif sys.argv[1] == '--help':110 print(usage)111             sys.exit()112 elif len(sys.argv) > 2:113 for i in sys.argv[1:]:114                 _argv = i.split('=')115 if _argv[0] == '-h':116                     self.host = _argv[1]117 elif _argv[0] == '-u':118                     self.user = _argv[1]119 elif _argv[0] == '-P':120                     self.port = int(_argv[1])121 elif _argv[0] == '-f':122                     self.fpath = _argv[1]123 elif _argv[0] == '-t':124                     self.tbevent = _argv[1]125 elif _argv[0] == '-p':126                     self.password = _argv[1]127 128 elif _argv[0] == '-oh':129                     self.on_host = _argv[1]130 elif _argv[0] == '-ou':131                     self.on_user = _argv[1]132 elif _argv[0] == '-oP':133                     self.on_port = int(_argv[1])134 elif _argv[0] == '-op':135                     self.on_password = _argv[1]136 137 elif _argv[0] == '-a':138                     self.action = _argv[1]139 140 else:141 print(usage)142 143 #创建表格,用于存储分析后的BINLOG内容144 def create_tab(self):145         logging.info('creating table {} to store binlog event'.format(self.tbevent))146         create_tb_sql ='''147         CREATE TABLE IF NOT EXISTS {}(148             auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,149             binlog_name VARCHAR(100) NOT NULL COMMENT 'the binlog file path and name',150             dml_start_time DATETIME NOT NULL COMMENT 'when to start this transaction ',151             dml_end_time DATETIME NOT NULL COMMENT 'when to finish this transaction ',152             end_log_pos BIGINT NOT NULL COMMENT 'the log position for finish this transaction',153             db_name VARCHAR(100) NOT NULL COMMENT 'which database happened this transaction ',154             table_name VARCHAR(200) NOT NULL COMMENT 'which table happened this transaction ',155             sqltype INT NOT NULL COMMENT '1 is insert,2 is update,3 is delete',156             dml_sql LONGTEXT NULL  COMMENT 'what sql excuted',157             undo_sql LONGTEXT NULL COMMENT 'rollback sql, this sql used for flashback',158             PRIMARY KEY (auto_id),159             INDEX sqltype(sqltype),160             INDEX dml_start_time (dml_start_time),161             INDEX dml_end_time (dml_end_time),162             INDEX end_log_pos (end_log_pos),163             INDEX db_name (db_name),164             INDEX table_name (table_name)165         )166         COLLATE='utf8_general_ci' ENGINE=InnoDB;167         TRUNCATE TABLE {};168 169 '''.format(self.tbevent,self.tbevent)170         self.cur.execute(create_tb_sql)171         logging.info('created table {} '.format(self.tbevent))172 173 #获取表格的列顺序对应的列名,并处理where set的时候,列与列之间的连接字符串是逗号还是 and174 def tbschema(self,dbname,tbname):175         self.tbfield_where = []176         self.tbfield_set = []177 178         sql_tb='desc {}.{}'.format(self.db_name,self.tb_name)179 180         self.on_cur.execute(sql_tb)181         tbcol=self.on_cur.fetchall()182 183         i = 0184 for l in tbcol:185 #self.tbfield.append(l['Field'])186 if i==0:187                 self.tbfield_where.append('`'+l['Field']+'`')188                 self.tbfield_set.append('`'+l['Field']+'`')189                 i+=1190 else:191                 self.tbfield_where.append('/*where*/ and /*where*/' + '`'+l['Field']+'`')192                 self.tbfield_set.append( '/*set*/ , /*set*/'+'`'+l['Field']+'`' )193 194 # 一个事务记录一行,若binlog file中的行记录包含 Table_map,则为事务的开始记录195 def rowrecord(self,bl_line):196 try:197 if bl_line.find('Table_map:') != -1:198                 l = bl_line.index('server')199                 m = bl_line.index('end_log_pos')200                 n = bl_line.index('Table_map')201                 begin_time = bl_line[:l:].rstrip(' ').replace('#', '20')202 203                 self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]204                 self.db_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[0]205                 self.tb_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[1]206 207                 self.tbschema(self.db_name,self.tb_name)208 except Exception:209 return 'funtion rowrecord error'210 211 def dml_tran(self,bl_line):212 try:213 214 215 if bl_line.find('Xid =') != -1:216 217                 l = bl_line.index('server')218                 m = bl_line.index('end_log_pos')219                 end_time = bl_line[:l:].rstrip(' ').replace('#', '20')220                 self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]221                 self.end_pos = int(bl_line[m::].split(' ')[1])222 223 224 225                 self.undo_sql = self.dml_sql.replace(' INSERT INTO', ';DELETE FROM_su').replace(' UPDATE ',';UPDATE').replace(' DELETE FROM', ';INSERT INTO').replace(';DELETE FROM_su', ';DELETE FROM').replace('WHERE', 'WHERE_marksu').replace('SET', 'WHERE').replace('WHERE_marksu', 'SET').replace('/*set*/ , /*set*/', ' and ').replace('/*where*/ and /*where*/',' , ')226                 self.dml_sql=self.dml_sql.replace('/*set*/ , /*set*/', ' , ').replace('/*where*/ and /*where*/',' and ')227 228 if self.dml_sql.startswith(' INSERT INTO '):229                     self.sqltype=1230 elif self.dml_sql.startswith(' UPDATE '):231                     self.sqltype=2232 elif self.dml_sql.startswith(' DELETE '):233                     self.sqltype=3234 235                 record_sql = ''236                 undosql_desc = ''237 238 #同个事务内部的行记录修改SQL,反序存储239 for l in self.undo_sql.splitlines():240 if l.startswith(' ;UPDATE') or l.startswith(' ;INSERT') or l.startswith(' ;DELETE'):241                         undosql_desc = record_sql + undosql_desc242                         record_sql = ''243                         record_sql = record_sql + l244 else:245                         record_sql = record_sql + l246 247                 self.undo_sql = record_sql + undosql_desc248                 self.undo_sql = self.undo_sql.lstrip()[1:]+';'249 250 #处理非空格的空白特殊字符251                 self.dml_sql = self.esc_code(self.dml_sql)252                 self.undo_sql = self.esc_code(self.undo_sql)253 254 #单独处理 转移字符: \'255                 self.dml_sql = self.dml_sql.replace("'", "''").replace('\\x27',"''''")  # + ';'256                 self.undo_sql = self.undo_sql.replace("'", "''").replace('\\x27',"''''")  # + ';'257 258 if len(self.dml_sql)>500000000:259                     with open('/tmp/flashback_undosql/'+str(self.end_pos)+'.sql', 'w') as w_f:260                         w_f.write('begin;' + '\n')261                         w_f.write(self.undo_sql)262                         w_f.write('commit;' + '\n')263                     self.dml_sql=''264                     self.undo_sql='/tmp/flashback_undosql/'+str(self.end_pos)+'.sql'265                     logging.info("the size of this transaction is more than 500Mb ,the file location : {}".format(self.undo_file))266 267                 insert_sql = "INSERT INTO {}(binlog_name,dml_start_time,dml_end_time,end_log_pos,db_name,table_name,sqltype,dml_sql,undo_sql) select  '{}','{}','{}','{}','{}','{}',{},'{}','{}'".format(268                     self.tbevent, self.fpath, self.begin_time, self.end_time, self.end_pos,269                     self.db_name, self.tb_name, self.sqltype, self.dml_sql, self.undo_sql)270 271                 self.cur.execute(insert_sql)272                 self.mysqlconn.commit()273 274                 self.dml_sql = ''275                 self.undo_sql = ''276 except Exception:277 print( 'funtion dml_tran error')278 279 280 def analyse_binlog(self):281 try:282             sqlcomma=0283             self.create_tab()284 285             with open(self.fpath,'r') as binlog_file:286                 logging.info('\033[36mbegining to analyze the binlog file ,this may be take a long time !!!\033[0m')287                 logging.info('\033[36manalyzing...\033[0m')288 for bline in binlog_file:289 if bline.find('Table_map:') != -1:290                         self.rowrecord(bline)291                         bline=''292 elif bline.rstrip()=='### SET':293                         bline = bline[3:]294                         sqlcomma=1295 elif bline.rstrip()=='### WHERE':296                         bline = bline[3:]297                         sqlcomma = 2298 elif bline.startswith('###   @'):299                         len_f=len('###   @')300                         i=bline[len_f:].split('=')[0]301 302 #处理timestamp类型303 if bline[8+len(i):].split(' ')[2] == 'TIMESTAMP(0)':304                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')305                             bline = bline.split('=')[0] + '=from_unixtime(' + bline[:stop_pos].split('=')[1] + ')'306 307 #处理负数存储方式308 if bline.split('=')[1].startswith('-'):309                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')310                             bline = bline.split('=')[0] + '=' + bline.split('=')[1].split(' ')[0]+'\n'311 312 if sqlcomma==1:313                             bline = self.tbfield_set[int(i) - 1]+bline[(len_f+len(i)):]314 elif sqlcomma==2:315                             bline = self.tbfield_where[int(i) - 1] + bline[(len_f+len(i)):]316 317 elif bline.startswith('### DELETE') or bline.startswith('### INSERT') or bline.startswith('### UPDATE'):318                         bline = bline[3:]319 320 elif bline.find('Xid =') != -1:321                         self.dml_tran(bline)322                         bline=''323 else:324                         bline = ''325 326 if bline.rstrip('\n') != '':327                         self.dml_sql = self.dml_sql + bline + ' '328 except Exception:329 return 'function do error'330 331 def esc_code(self,sql):332         esc={333 '\\x07':'\a','\\x08':'\b','\\x0c':'\f','\\x0a':'\n','\\x0d':'\r','\\x09':'\t','\\x0b':'\v','\\x5c':'\\',334 #'\\x27':'\'',335 '\\x22':'\"','\\x3f':'\?','\\x00':'\0'336              }337 338 for k,v in esc.items():339             sql=sql.replace(k,v)340 return sql341 342 def binlogdesc(self):343 344         countsql='select sqltype , count(*) numbers from {} group by sqltype order by sqltype '.format(self.tbevent)345 print(countsql)346         self.cur.execute(countsql)347         count_row=self.cur.fetchall()348 349         update_count=0350         insert_couont=0351         delete_count=0352 for row in count_row:353 if row['sqltype']==1:354                 insert_couont=row['numbers']355 elif row['sqltype']==2:356                 update_count=row['numbers']357 elif row['sqltype']==3:358                 delete_count=row['numbers']359         logging.info('\033[1;35mTotal transactions number is {}: {} inserts, {} updates, {} deletes !\033[0m(all number is accurate, the other is approximate value) \033[0m'.format(insert_couont+update_count+delete_count,insert_couont,update_count,delete_count))360 361 def undosql(self,number):362 #这里会有几个问题:363 #1 如果一共有几十万甚至更多的事务操作,那么这个python脚本,极为占用内存,有可能执行错误;364 #2 如果单个事务中,涉及修改的行数高达几十万行,其binlog file 达好几G,这里也会有内存损耗问题;365 #所以,针对第一点,这里考虑对超多事务进行一个分批执行处理,每个批次处理number个事务,避免一次性把所有事务放到python中;但是第2点,目前暂未处理366 367         tran_num=1368         id=0369 370         tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)371 372         self.cur.execute(tran_num_sql)373         tran_rows=self.cur.fetchall()374 375 for num in tran_rows:376             tran_num=num['table_rows']377 378         logging.info('\033[32mThere has {} transactions ,need {} batchs ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))379 380 while id<=tran_num:381             logging.info('doing batch : {} '.format(int(id/number)+1))382             undo_sql='select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)383             self.cur.execute(undo_sql)384 385             undo_rows=self.cur.fetchall()386             f_sql=''387 388 for u_row in undo_rows:389 try:390                     self.on_cur.execute(u_row['undo_sql'])391                     self.on_mysqlconn.commit()392 except Exception:393 print('auto_id:',u_row['auto_id'])394             id+=number395 396 397 def undo_file(self,number):398 # 也可以选择私用undo_file将undo_sql导入到文件中,然后再source399 400         tran_num=1401         id=0402 403         tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)404 405         self.cur.execute(tran_num_sql)406         tran_rows=self.cur.fetchall()407 408 for num in tran_rows:409             tran_num=num['table_rows']410 411         logging.info('copy undo_sql to undo file on : /tmp/flashback_undosql/undo_file_flashback.sql')412         logging.info('\033[32mThere has {} transactions ,need {} batchs to copy ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))413 414         with open('/tmp/flashback_undosql/undo_file_flashback.sql', 'w') as w_f:415 while id<=tran_num:416                 logging.info('doing batch : {} '.format(int(id/number)+1))417                 undo_sql='select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)418                 self.cur.execute(undo_sql)419 420                 undo_rows=self.cur.fetchall()421 for u_row in undo_rows:422 try:423                         w_f.write('begin;' + '\n')424                         w_f.write('# auto_id'+str(u_row['auto_id']) + '\n')425                         w_f.write(u_row['undo_sql'] + '\n')426                         w_f.write('commit;' + '\n')427 except Exception:428 print('auto_id',u_row['auto_id'])429 #time.sleep(2)430                 id+=number431 432 def do(self):433 if self.action=='0':434             self.analyse_binlog()435             logging.info('\033[36mfinished to analyze the binlog file !!!\033[0m')436 #self.binlogdesc()437 elif self.action=='1':438             self.undosql(10000)439 440 def closeconn(self):441         self.cur.close()442         self.on_cur.close()443         logging.info('release all db connections')444         logging.info('\033[33mAll done,check the {} which stored binlog event on host {} , port {} \033[0m'.format(self.tbevent,self.host,self.port))445 446 def main():447     p = flashback()448     p.do()449     p.closeconn()450 451 if __name__ == "__main__":452     main()

 

注:本文转载自http://www.cnblogs.com/xinysu/p/7052923.html,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除。
上一篇 下一篇

评论



最新评论

PHP笔记: ypengchao@126.com 查看原文 06月26日 11:38
PHP笔记: 用数据库 查看原文 06月26日 11:37
网站/shl设计: 如果关闭浏览器了,session就没有了呀。再打开浏览器不就可以重新登陆了。 查看原文 05月28日 15:26
网站/shl设计: 站长怎么联系你有给问题请教。 查看原文 05月28日 15:25
网站/shl设计: 说的不错! 查看原文 05月28日 15:22

分享

扫一扫,快速分享到微信

赞助商