In this blog, we will discuss MySQL 5.7 asynchronous query execution using the X Plugin.
Overview
MySQL 5.7 supports X Plugin / X Protocol, which allows (if the library supports it) asynchronous query execution. In 2014, I published a blog on how to increase a slow query performance with the parallel query execution. There, I created a prototype in the bash shell. Here, I’ve tried a similar idea with NodeJS + mysqlx library (which uses MySQL X Plugin).
TL;DR version: By using the MySQL X Plugin with NodeJS I was able to increase query performance 10x (some query rewrite required).
X Protocol and NodeJS
Here are the steps required:
- First, we will need to enable X Plugin in MySQL 5.7.12+, which will use a different port (33060 by default).
- Second, download and install NodeJS (>4.2) and mysql-connector-nodejs-1.0.2.tar.gz (follow Getting Started with Connector/Node.JS guide).1234# node --versionv4.4.4# wget https://dev.mysql.com/get/Downloads/Connector-Nodejs/mysql-connector-nodejs-1.0.2.tar.gz# npm install mysql-connector-nodejs-1.0.2.tar.gz
Please note: on older systems, you will probably need to upgrade the nodejs version. Follow the Installing Node.js via package manager guide. - All set! Now we can use the asynchronous queries feature.
Test data
I’m using the same Wikipedia Page Counts dataset (wikistats) I’ve used for my Apache Spark and MySQL example. Let’s imagine we want to compare the popularity of MySQL versus PostgeSQL in January 2008 (comparing the total page views). Here are the sample queries:
1 2 | mysql> select sum(tot_visits) from wikistats_by_day_spark where url like '%mysql%'; mysql> select sum(tot_visits) from wikistats_by_day_spark where url like '%postgresql%'; |
The table size only holds data for English Wikipedia for January 2008, but still has ~200M rows and ~16G in size. Both queries run for ~5 minutes each, and utilize only one CPU core (one connection = one CPU core). The box has 24 CPU cores, Intel(R) Xeon(R) CPU L5639 @ 2.13GHz. Can we run the query in parallel, utilizing all cores?
That is possible now with NodeJS and X Plugin, but require some preparation:
- Partition the table using hash, 24 partitions:12345678910CREATE TABLE `wikistats_by_day_spark_part` (`id` int(11) NOT NULL AUTO_INCREMENT,`mydate` date NOT NULL,`url` text,`cnt` bigint(20) NOT NULL,`tot_visits` bigint(20) DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=239863472 DEFAULT CHARSET=latin1/*!50100 PARTITION BY HASH (id)PARTITIONS 24 */
- Rewrite the query running one connection (= one thread) per each partition, choosing its own partition for each thread:1select sum(tot_visits) from wikistats_by_day_spark_part partition (p<N>) where url like '%mysql%';
- Wrap it up inside the NodeJS Callback functions / Promises.
The code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | var mysqlx = require('mysqlx'); var cs_pre = { host: 'localhost', port: 33060, dbUser: 'root', dbPassword: 'mysql' }; var cs = { host: 'localhost', port: 33060, dbUser: 'root', dbPassword: 'mysql' }; var partitions = []; var res = []; var total = 0; mysqlx.getNodeSession( cs_pre ).then(session_pre => { var sql="select partition_name from information_schema.partitions where table_name = 'wikistats_by_day_spark_part' and table_schema = 'wikistats' "; session_pre.executeSql(sql) .execute(function (row) { partitions.push(row); }).catch(err => { console.log(err); }) .then( function () { partitions.forEach(function(p) { mysqlx.getNodeSession( cs ).then(session => { var sql="select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(" + p + ") where url like '%mysql%';" console.log("Started SQL for partiton: " + p); return Promise.all([ session.executeSql(sql) .execute(function (row) { console.log(p + ":" + row); res.push(row); total = Number(total) + Number(row); }).catch(err => { console.log(err); }), session.close() ]); }).catch(err => { console.log(err + "partition: " + p); }).then(function() { // All done if (res.length == partitions.length) { console.log("All done! Total: " + total); // can now sort "res" array if needed an display } }); }); }); session_pre.close(); }); console.log("Starting..."); |
The explanation
The idea here is rather simple:
- Find all the partitions for the table by using “select partition_name from information_schema.partitions”
- For each partition, run the query in parallel: create a connection, run the query with a specific partition name, define the callback function, then close the connection.
- As the callback function is used, the code will not be blocked, but rather proceed to the next iteration. When the query is finished, the callback function will be executed.
- Inside the callback function, I’m saving the result into an array and also calculating the total (actually I only need a total in this example).1234.execute(function (row) {console.log(p + ":" + row);res.push(row);total = Number(total) + Number(row); ...
Asynchronous Salad: tomacucumtoes,bersmayonn,aise *
This may blow your mind: because everything is running asynchronously, the callback functions will return when ready. Here is the result of the above script:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | $ time node async_wikistats.js Starting... Started SQL for partiton: p0 Started SQL for partiton: p1 Started SQL for partiton: p2 Started SQL for partiton: p3 Started SQL for partiton: p4 Started SQL for partiton: p5 Started SQL for partiton: p7 Started SQL for partiton: p8 Started SQL for partiton: p6 Started SQL for partiton: p9 Started SQL for partiton: p10 Started SQL for partiton: p12 Started SQL for partiton: p13 Started SQL for partiton: p11 Started SQL for partiton: p14 Started SQL for partiton: p15 Started SQL for partiton: p16 Started SQL for partiton: p17 Started SQL for partiton: p18 Started SQL for partiton: p19 Started SQL for partiton: p20 Started SQL for partiton: p21 Started SQL for partiton: p22 Started SQL for partiton: p23 |
… here the script will wait for the async calls to return, and they will return when ready – the order is not defined.
Meanwhile, we can watch MySQL processlist:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | +------+------+-----------------+-----------+---------+-------+--------------+-------------------------------------------------------------------------------------------------------------------+ | Id | User | Host | db | Command | Time | State | Info | +------+------+-----------------+-----------+---------+-------+--------------+-------------------------------------------------------------------------------------------------------------------+ | 186 | root | localhost:44750 | NULL | Sleep | 21391 | cleaning up | PLUGIN | | 2290 | root | localhost | wikistats | Sleep | 1417 | | NULL | | 2510 | root | localhost:41737 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p0) where url like '%mysql%' | | 2511 | root | localhost:41738 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p1) where url like '%mysql%' | | 2512 | root | localhost:41739 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p2) where url like '%mysql%' | | 2513 | root | localhost:41741 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p4) where url like '%mysql%' | | 2514 | root | localhost:41740 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p3) where url like '%mysql%' | | 2515 | root | localhost:41742 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p5) where url like '%mysql%' | | 2516 | root | localhost:41743 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p6) where url like '%mysql%' | | 2517 | root | localhost:41744 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p7) where url like '%mysql%' | | 2518 | root | localhost:41745 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p8) where url like '%mysql%' | | 2519 | root | localhost:41746 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p9) where url like '%mysql%' | | 2520 | root | localhost:41747 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p10) where url like '%mysql%' | | 2521 | root | localhost:41748 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p11) where url like '%mysql%' | | 2522 | root | localhost:41749 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p12) where url like '%mysql%' | | 2523 | root | localhost:41750 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p13) where url like '%mysql%' | | 2524 | root | localhost:41751 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p14) where url like '%mysql%' | | 2525 | root | localhost:41752 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p15) where url like '%mysql%' | | 2526 | root | localhost:41753 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p16) where url like '%mysql%' | | 2527 | root | localhost:41754 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p17) where url like '%mysql%' | | 2528 | root | localhost:41755 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p18) where url like '%mysql%' | | 2529 | root | localhost:41756 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p19) where url like '%mysql%' | | 2530 | root | localhost:41757 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p20) where url like '%mysql%' | | 2531 | root | localhost:41758 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p21) where url like '%mysql%' | | 2532 | root | localhost:41759 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p22) where url like '%mysql%' | | 2533 | root | localhost:41760 | NULL | Query | 2 | Sending data | PLUGIN: select sum(tot_visits) from wikistats.wikistats_by_day_spark_part partition(p23) where url like '%mysql%' | | 2534 | root | localhost | NULL | Query | 0 | starting | show full processlist | +------+------+-----------------+-----------+---------+-------+--------------+-------------------------------------------------------------------------------------------------------------------+ |
And CPU utilization:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | Tasks: 41 total, 1 running, 33 sleeping, 7 stopped, 0 zombie %Cpu0 : 91.9 us, 1.7 sy, 0.0 ni, 6.4 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu1 : 97.3 us, 2.7 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu2 : 97.0 us, 3.0 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu3 : 97.7 us, 2.3 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu4 : 95.7 us, 2.7 sy, 0.0 ni, 1.7 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu5 : 98.3 us, 1.7 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu6 : 98.3 us, 1.7 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu7 : 97.7 us, 2.3 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu8 : 96.7 us, 3.0 sy, 0.0 ni, 0.3 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu9 : 98.3 us, 1.7 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu10 : 95.7 us, 4.3 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu11 : 97.7 us, 2.3 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu12 : 98.0 us, 2.0 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu13 : 98.0 us, 1.7 sy, 0.0 ni, 0.3 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu14 : 97.7 us, 2.3 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu15 : 97.3 us, 2.7 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu16 : 98.0 us, 2.0 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu17 :100.0 us, 0.0 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu18 : 97.3 us, 2.7 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu19 : 98.7 us, 1.3 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu20 : 99.3 us, 0.7 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu21 : 97.3 us, 2.3 sy, 0.0 ni, 0.3 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu22 : 97.0 us, 3.0 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu23 : 96.0 us, 4.0 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st ... PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 18901 mysql 20 0 25.843g 0.017t 7808 S 2386 37.0 295:34.05 mysqld |
Now, here is our “salad”:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | p1:2499 p23:2366 p2:2297 p0:4735 p12:12349 p14:1412 p3:2045 p16:4157 p20:3160 p18:8717 p17:2967 p13:4519 p15:5462 p10:1312 p5:2815 p7:4644 p9:766 p4:3218 p6:4175 p21:2958 p8:929 p19:4182 p22:3231 p11:4020 |
As we can see, all partitions are in random order. If needed, we can even sort the result array (which isn’t needed for this example as we only care about the total). Finally our result and timing:
1 2 3 4 5 | All done! Total: 88935 real 0m30.668s user 0m0.256s sys 0m0.028s |
Timing and Results
- Original query, single thread: 5 minutes
- Modified query, 24 threads in Node JS: 30 seconds
- Performance increase: 10x
If you are interested in the original question (MySQL versus PostgreSQL, Jan 2008):
- MySQL, total visits: 88935
- PostgreSQL total visits: 17753
Further Reading:
- NodeJS connector for MySQL: Getting Started
- X Plugin overview
- Connector/Node.js example in MySQL 5.7
- Callback functions concept
PS: Original Asynchronous Salad Joke, by Vlad @Crazy_Owl (in Russian)
Just as a sort of side note: I think this is “pipelining” ( https://dev.mysql.com/doc/internals/en/x-protocol-lifecycle-pipelining.html ) rather than strictly-speaking “parallelized” (though obviously the queries themselves are in parallel across the partitions). You get results from NodeJS in random order, but AFAICT from playing with the X Protocol in Perl, the queries sent back over the socket are returned in the same order as you sent them. So if, for example, you sent the 1st query and it takes the server 5 minutes, while the remaining 23 queries take 30 seconds each, you’d still only get the results after 5 minutes. At least that’s my hypothesis. 🙂
The example is parellizing, mind that he’s using different connections leading to different server threads. But you are right – with pipelining the order is kept. The server will respond in the queried order.
(Sorry if this is a dupe. Failed to post before.)
AFAICT what happens is, although you get results from NodeJS randomly, the server will actually return the resultsets in the same order as the queries were sent. So (again, AFAICT), beware that if the 1st query was to take 5 minutes while the others took 30 seconds, you’d still have to wait 5 minutes before any results are sent back.
Scott, yes, I’m opening 24 connections to MySQL, similar how map/reduce works.
Thanks for the example. It shows clearly how to manipulate queries and results in JS.
However, I am puzzled by the method that you chose. The parallelization is possible because partitions in MySQL 5.7 don’t lock the whole table as they did before (https://dev.mysql.com/doc/refman/5.7/en/partitioning-limitations-locking.html).
The same result can be achieved by using a shell script that runs N queries (with the regular MySQL client) in the background and report the result to a text file, which is then summarized.
The benefit that I see in your solution is only the ability of running parallelized queries in a clean syntax without being a wizard of parallel execution with background processes.
For the purpose of understanding the technology better (I am still exploring its capabilities) could you show an example, even without code, that produces benefits without using partitions?
Giuseppe, yes, you are right and this was confusing. The parallelization is NodeJS is just much easier and very similar to what I did a year ago with a simple shell script.
Unfortunately, pipelining with X Plugin (http://mysqlserverteam.com/mysql-5-7-12-part-2-improving-the-mysql-protocol/) does not gives much better performance as it still runs all queries in 1 thread and only saves the round trip. (In the next blog post I’m going to show how it can be beneficial thou).
Here is the timing:
1. Pipeline with NojeJS
$ time node async_wikistats_pipeline.js
…
All done! Total: 17753
real 5m39.666s
user 0m0.212s
sys 0m0.024s
2. Direct query – partitioned table:
mysql> select sum(tot_visits) from wikistats.wikistats_by_day_spark_part where url like ‘%postgresql%’;
+—————–+
| sum(tot_visits) |
+—————–+
| 17753 |
+—————–+
1 row in set (5 min 31.44 sec)
3. Direct query – non partitioned table.
mysql> select sum(tot_visits) from wikistats.wikistats_by_day_spark where url like ‘%postgresql%’;
+—————–+
| sum(tot_visits) |
+—————–+
| 17753 |
+—————–+
1 row in set (4 min 38.16 sec)
With pipeline with NojeJS I’m reusing the same connection (and do not open a new one for each thread).
I wish with pipelining with X Plugin I can open a number of connections: );
For example:
var conn = mysqlx.getNodeSession( cs,
Then X Plugin will run queries in parallel across those connections.