最近一次数据迁移,需要将MySQL的数据导出、处理后导入到新表和ES。这里做个简单记录,方便后续查询。

注: 为了写文章方便及隐私安全,实际内容会有所简化。例如表结构简化、数据库连接部分全部用 xxx 表示、目录及文件名均为化名等。

实践过程

原表:

book_db 库
  - b_book(id,create_time,update_time,price,title,intro)

新表:

book 库
  - book(id,price,title,create_time,update_time)
  - book_ext(id,book_id,intro,create_time)

MySQL导出

mkdir -p /tmp/

# 导出原始数据
mysql -hxxx -uxxx -pxxx book_db  --default-character-set=utf8  -e \'select id,create_time,update_time,price,title,intro from b_book\' | sed \'s/NULL//g\'   > /tmp/b_book.csv

sed \'s/NULL//g\'是因为导出的数据有些字段存的NULL,新表不需要存储NULL,所以去掉。

导出的数据每行默认以\t分隔,第一行包含字段名。这里我们删掉第一行:

sed -i \'1d\' /tmp/b_book.csv

数据处理

cd /tmp/

# 处理create_time,update_time,price,并生成文件 book.csv
cat b_book.csv | awk -F \'\t\' -v OFS=\' @@@ \' \'{gsub(/[-:]/," ",$2); $2=mktime($2);gsub(/[-:]/,"",$3);$3=mktime($3);$4=$4*100;$6="";print $0}\' > book.csv

# 生成文件 book_ext.csv
cat b_book.csv | awk -F \'\t\' -v OFS=\' @@@ \' \'{print $1,$6}\' > book_ext.csv

# 生成文件 book_es.csv
cat b_book.csv | awk -F \'\t\' -v OFS=\' @@@ \' \'{$4=$4*100;print $0}\' > book_es.csv

因为原表里时间都是datetime格式,新表是时间戳格式,这里处理成时间戳格式。价格原表是以元为单位,这里*100是为了处理成以分为单位。

-v OFS=\' @@@ \'表示输出的时候每列以@@@为分隔符。原因是原表里的intro字段存储的是html,可能包含常用转义字符,这里使用@@@确保能正确分隔每列。

导入到MySQL

mysql -hxxx -uxxx -pxxx book
Load Data LOCAL InFile \'/tmp/book.csv\' Into Table book 
character set utf8 
Fields Terminated By \' @@@ \' Enclosed By \'\' Escaped By \'\' Lines Terminated By \'\n\'
(id,create_time,update_time,price,title);
 
Load Data LOCAL InFile \'/tmp/book_ext.csv\' Into Table book_ext 
character set utf8 
Fields Terminated By \' @@@ \' Enclosed By \'\' Escaped By \'\' Lines Terminated By \'\n\'
(book_id,intro);

说明:

  • Terminated 字段分隔符(列分隔符)。一般是空格或者\t
  • Enclosed 字段括起字符。没有为空字符即可
  • Escaped 转义字符。没有为空字符即可
  • Terminated 记录分隔符(行结束符)

Into Table 代表插入,记录已存在(唯一键约束)则失败不再往下执行。Replace Into Table 代表覆盖,记录已存在则覆盖(是整条记录覆盖,没有列出的字段给默认值)。Ignore Into Table 遇到已存在直接跳过。

导入到ES

由于生产的book_es.csv文件比较大,所以这里按20000条生成一个文件,防止文件过大,ES导入失败。

cd /tmp/

awk \'{filename = "book_es.csv." int((NR-1)/20000) ".csv"; print >> filename}\' book_es.csv

ConvertBookToEs.php是PHP脚本,生成ES批量导入的文件。见附录。执行后生成很多book_es.csv.*.csv.json文件。

php ConvertBookToEs.php

importToEs.sh是ES批量导入脚本,如下:

#!/bin/bash
for file in `ls /tmp/book_es.csv.*.csv.json` 
do
   echo $file;
   curl -XPOST http://xxx:9200/book/doc/_bulk -H "Content-Type: application/json" --data-binary "@$file"  >> importToEs.log
done

执行脚本:

sh importToEs.sh

等待数分钟,便执行完毕了。

CASE WHEN 按字段更新批量更新

格式示例:

更新单值:

UPDATE categories SET
    display_order = CASE id
        WHEN 1 THEN 3
        WHEN 2 THEN 4
        WHEN 3 THEN 5
    END
WHERE id IN (1,2,3)

更新多值:

UPDATE categories SET
    display_order = CASE id
        WHEN 1 THEN 3
        WHEN 2 THEN 4
        WHEN 3 THEN 5
    END,
    title = CASE id
        WHEN 1 THEN \'New Title 1\'
        WHEN 2 THEN \'New Title 2\'
        WHEN 3 THEN \'New Title 3\'
    END
WHERE id IN (1,2,3)

PHP封装:


    /**
     * 批量更新函数
     * @param $data array 待更新的数据,二维数组格式
     * @param array $params array 值相同的条件,键值对应的一维数组
     * @param string $field string 值不同的条件,默认为id
     * @return bool|string
     */
    function batchUpdate($data, $field, $table, $params = [])
    {
        if (!is_array($data) || !$field || !is_array($params)) {
            return false;
        }

        //in条件
        $in_fields = array_column($data, $field);
        $in_fields = implode(\',\', array_map(function ($value) {
            return "\'" . $value . "\'";
        }, $in_fields));

        $updates = parseUpdate($data, $field);
        $where = parseParams($params);

        $sql = sprintf("UPDATE `%s` SET %s WHERE `%s` IN (%s) %s;\n", $table, $updates, $field, $in_fields, $where);

        return $sql;
    }

    /**
     * 将二维数组转换成CASE WHEN THEN的批量更新条件
     * @param $data array 二维数组
     * @param $field string 列名
     * @return string sql语句
     */
    function parseUpdate($data, $field)
    {
        $sql = \'\';
        $keys = array_keys(current($data));
        foreach ($keys as $column) {
            if ($column == $field) {//去掉ID主键
                continue;
            }

            $sql .= sprintf("`%s` = CASE `%s` \n", $column, $field);
            foreach ($data as $line) {
                $sql .= sprintf("WHEN \'%s\' THEN \'%s\' \n", $line[$field], $line[$column]);
            }
            $sql .= "END,";
        }

        return rtrim($sql, \',\');
    }

    /**
     * 解析where条件
     * @param $params
     * @return array|string
     */
    function parseParams($params)
    {
        $where = [];
        foreach ($params as $key => $value) {
            $where[] = sprintf("`%s` = \'%s\'", $key, $value);
        }

        return $where ? \' AND \' . implode(\' AND \', $where) : \'\';
    }

调用示例:

$data = [
    [\'id\' => 1, \'parent_id\' => 100, \'title\' => \'A\', \'sort\' => 1],
    [\'id\' => 2, \'parent_id\' => 100, \'title\' => \'A\', \'sort\' => 3],
    [\'id\' => 3, \'parent_id\' => 100, \'title\' => \'A\', \'sort\' => 5],
    [\'id\' => 4, \'parent_id\' => 100, \'title\' => \'B\', \'sort\' => 7],
    [\'id\' => 5, \'parent_id\' => 101, \'title\' => \'A\', \'sort\' => 9],
];

echo batchUpdate($data, \'id\', "post");

生成的SQL:

UPDATE `post` SET parent_id` = CASE `id` 
WHEN \'1\' THEN \'100\' 
WHEN \'2\' THEN \'100\' 
WHEN \'3\' THEN \'100\' 
WHEN \'4\' THEN \'100\' 
WHEN \'5\' THEN \'101\' 
END,`title` = CASE `id` 
WHEN \'1\' THEN \'A\' 
WHEN \'2\' THEN \'A\' 
WHEN \'3\' THEN \'A\' 
WHEN \'4\' THEN \'B\' 
WHEN \'5\' THEN \'A\' 
END,`sort` = CASE `id` 
WHEN \'1\' THEN \'1\' 
WHEN \'2\' THEN \'3\' 
WHEN \'3\' THEN \'5\' 
WHEN \'4\' THEN \'7\' 
WHEN \'5\' THEN \'9\' 
END WHERE `id` IN (\'1\',\'2\',\'3\',\'4\',\'5\') ;

实现MySQL LOAD DATA按字段更新

为了将大量数据加载到MySQL中,LOAD DATA INFILE是迄今为止最快的选择。但是,虽然这可以以INSERT IGNOREREPLACE的方式使用,但目前不支持ON DUPLICATE KEY UPDATE

如果我们想批量更新某个字段,ON DUPLICATE KEY UPDATE如何使用LOAD DATA INFILE模拟?

stackoverflow 上有网友给了答案。步骤是:

1)创建一个新的临时表。

CREATE TEMPORARY TABLE temporary_table LIKE target_table;

2)从临时表中删除所有索引以加快速度。(可选)

SHOW INDEX FROM temporary_table;

DROP INDEX `PRIMARY` ON temporary_table;
DROP INDEX `some_other_index` ON temporary_table;

3)将CSV加载到临时表中

LOAD DATA INFILE \'your_file.csv\'
INTO TABLE temporary_table
Fields Terminated By \'\t\' Enclosed By \'\' Escaped By \'\' Lines Terminated By \'\n\'
(field1, field2);

4)使用ON DUPLICATE KEY UPDATE复制数据

SHOW COLUMNS FROM target_table;

INSERT INTO target_table
SELECT * FROM temporary_table
ON DUPLICATE KEY UPDATE field1 = VALUES(field1), field2 = VALUES(field2);

MySQL将假定=之前的部分引用INSERT INTO子句中指定的列,第二部分引用SELECT列。

5)删除临时表

DROP TEMPORARY TABLE temporary_table;

使用SHOW INDEX FROMSHOW COLUMNS FROM此过程可以针对任何给定的表自动执行。

注:官方文档里 INSERT ... SELECT ON DUPLICATE KEY UPDATE语句被标记为基于语句的复制不安全。所以上述方案请在充分测试后再实施。详见:
https://dev.mysql.com/doc/refman/5.6/en/insert-on-duplicate.html

附录

ConvertBookToEs.php

<?php
/**
 * 转换wish_book为ES 批量格式(json)
 */

//id,create_time,update_time,price,title,intro
function dealBook($file)
{
    $fp = fopen($file, \'r\');
    while (!feof($fp)) {
        $line = explode(\' @@@ \', fgets($fp, 65535));
        if ($line && isset($line[1])) {
            $arr_head = [
                \'index\' => [
                    \'_id\' => (int)$line[0]
                ]
            ];
            $arr = [
                \'id\' => (int)$line[0],
                \'create_time\' => strtotime($line[1]),
                \'update_time\' => strtotime($line[2]),
                \'price\' => intval($line[3]),
                \'title\' => (string)$line[4],
                \'intro\' => (string)$line[18],
            ];

            file_put_contents($file . \'.json\', json_encode($arr_head, JSON_UNESCAPED_UNICODE) . PHP_EOL, FILE_APPEND);
            file_put_contents($file . \'.json\', json_encode($arr, JSON_UNESCAPED_UNICODE) . PHP_EOL, FILE_APPEND);
        }
    }

}

try {

    //处理CSV文件为es bluk json格式
    //参考 https://www.elastic.co/guide/en/elasticsearch/reference/current/_batch_processing.html
    $files = glob("/tmp/book_es.csv.*.csv");
    if (false === $files) {
        exit("can not find csv file");
    }
    $pids = [];

    foreach ($files as $i => $file) {
        $pid = pcntl_fork();
        if ($pid < 0) {
            exit("could not fork");
        }

        if ($pid > 0) {
            $pids[$pid] = $pid;
        } else {
            echo time() . " new process, pid:" . getmypid() . PHP_EOL;
            dealBook($file);
            exit();
        }
    }

    while (count($pids)) {
        foreach ($pids as $key => $pid) {
            $res = pcntl_waitpid($pid, $status, WNOHANG);
            if ($res == -1 || $res > 0) {
                echo \'Child process exit,pid \' . $pid . PHP_EOL;
                unset($pids[$key]);
            }
        }
        sleep(1);
    }

} catch (Exception $e) {
    $message = $e->getFile() . \':\' . $e->getLine() . \' \' . $e->getMessage();
    echo $message;
}

参考

1、Linux命令行文本工具 – 飞鸿影~ – 博客园
https://www.cnblogs.com/52fhy/p/5836429.html
2、mysqldump 导出 csv 格式 –fields-terminated-by=, :字段分割符; – superhosts的专栏 – CSDN博客
https://blog.csdn.net/superhosts/article/details/26054997
3、Batch Processing | Elasticsearch Reference [6.4] | Elastic
https://www.elastic.co/guide/en/elasticsearch/reference/current/_batch_processing.html
4、mysql导入数据load data infile用法整理 – conanwang – 博客园
https://www.cnblogs.com/conanwang/p/5890753.html
5、MySQL LOAD DATA INFILE with ON DUPLICATE KEY UPDATE – Stack Overflow
https://stackoverflow.com/questions/15271202/mysql-load-data-infile-with-on-duplicate-key-update
6、mysql – INSERT INTO … SELECT FROM … ON DUPLICATE KEY UPDATE – Stack Overflow
https://stackoverflow.com/questions/2472229/insert-into-select-from-on-duplicate-key-update
7、MySQL :: MySQL 5.6参考手册:: 13.2.5.2 INSERT … ON DUPLICATE KEY UPDATE语法
https://dev.mysql.com/doc/refman/5.6/en/insert-on-duplicate.html
8、复制表结构和数据SQL语句 – becket – 博客园
https://www.cnblogs.com/zhengxu/articles/2206894.html
9、MySQL批量更新数据 – 梦想_行人 – 博客园
https://www.cnblogs.com/ldj3/p/9288187.html

版权声明:本文为52fhy原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/52fhy/p/10051338.html