简单记录一下,使用node的流读取mysql数据,解决分页扫表慢的问题。
场景
有一个计算任务,需要逐条扫描一张大表里的符合条件的数据,并且做一些简单的计算插入一张小表中去,类似select * from tableA where condition > 1。假设符合条件的数据有1000w条,使用分页,每页1000条,逐页查询并计算完插入小表。
瓶颈
mysql分页的特性。。就是越往后越慢,晚上计算任务又多,很容易跑不完或者挂掉。
哪怕优化了分页,效果还是不如意。
解决方案
我们用的 knex.js,不能算是orm,只能算是一个sql builder吧。
使用knex的stream创建一个流,然后使用node的流事件.on('data')监听这个流,每查询一条数据都会触发流事件。
这里只是提供了一个思路,上代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| 'use strict'; const logger = require('./filterLogger') const _ = require('lodash') const knex = require("./db")
function streamReadToWriteDB(sql, dealFunction, pageSize) { let n = 0; let tempData = []; return new Promise(function (resolve, reject) { knex.raw(sql) .stream(function (stream) { stream.on('data', data => { tempData.push(data) n++; if (tempData.length >= pageSize) { stream.pause() dealFunction(tempData) .then(() => { tempData = [] stream.resume() }) } }) stream.on('end', () => { if (!_.isEmpty(tempData)) { logger.trace(`任务剩余${tempData.length}条未处理,即将处理!`) dealFunction(tempData) .then(results => { tempData = [] logger.trace(`任务执行完毕!一共${n}片数据,每片${pageSize}条`) resolve(results) }) } else { logger.trace(`任务执行完毕!一共${n}片数据,每片${pageSize}条`)
resolve() } }) }) .then(() => { return Promise.resolve(true) }) .catch(err => { logger.warn('流处理出错', err) reject() }) }) }
module.exports = { streamReadToWriteDB }
|
代码中的pagesize是嫌弃一次处理一条数据太浪费了,多缓存几个一起处理。
这中写法效果非常好,千万数据量速度快了几十倍。。
当然和es的scroll写法一样,流不支持跳页。