阅读背景:

低InnoDB每秒写入AWS EC2到MySQL RDS使用Python

来源:互联网 

I have around 60GB of JSON files that I am parsing using Python and then inserting into a MySQL database using the Python-MySQL Connector. Each JSON file is around 500MB

我有大约60GB的JSON文件,我正在使用Python进行解析,然后使用Python-MySQL连接器插入到MySQL数据库中。每个JSON文件约为500MB

I have been using an AWS r3.xlarge EC2 instance with a secondary volume to hold the 60GB of JSON data.

我一直在用AWS r3。xlarge EC2实例,它的第二卷存储了60GB的JSON数据。

I am then using an AWS RDS r3.xlarge MySQL instance. These instances are all in the same region and availability zone. The EC2 instance is using the following Python script to load the JSON, parse it and then insert it into the MySQL RDS. My python:

然后我用的是AWS RDS r3。超大MySQL实例。这些实例都位于相同的区域和可用性区域。EC2实例使用以下Python脚本加载JSON,解析它,然后将其插入到MySQL RDS中。我的python:

import json
import mysql.connector
from mysql.connector import errorcode
from pprint import pprint
import glob
import os

os.chdir("./json_data")

for file in glob.glob("*.json"):
    with open(file, 'rU') as data_file:
        results = json.load(data_file)
        print('working on file:', file)

    cnx = mysql.connector.connect(user='', password='',
        host='')

    cursor = cnx.cursor(buffered=True)

    DB_NAME = 'DB'

    def create_database(cursor):
        try:
            cursor.execute(
                "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(DB_NAME))
        except mysql.connector.Error as err:
            print("Failed creating database: {}".format(err))
            exit(1)

    try:
        cnx.database = DB_NAME    
    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_BAD_DB_ERROR:
            create_database(cursor)
            cnx.database = DB_NAME
        else:
            print(err)
            exit(1)

    add_overall_data = ("INSERT INTO master" 
        "(_sent_time_stamp, dt, ds, dtf, O_l, O_ln, O_Ls, O_a, D_l, D_ln, d_a)"
        "VALUES (%(_sent_time_stamp)s, %(dt)s, %(ds)s, %(dtf)s, %(O_l)s, %(O_ln)s, %(O_Ls)s, %(O_a)s, %(D_l)s, %(D_ln)s, %(d_a)s)")

    add_polyline = ("INSERT INTO polyline"
        "(Overview_polyline, request_no)"
        "VALUES (%(Overview_polyline)s, %(request_no)s)")

    add_summary = ("INSERT INTO summary"
        "(summary, request_no)"
        "VALUES (%(summary)s, %(request_no)s)")

    add_warnings = ("INSERT INTO warnings"
        "(warnings, request_no)"
        "VALUES (%(warnings)s, %(request_no)s)")

    add_waypoint_order = ("INSERT INTO waypoint_order"
        "(waypoint_order, request_no)"
        "VALUES (%(waypoint_order)s, %(request_no)s)")

    add_leg_data = ("INSERT INTO leg_data"
        "(request_no, leg_dt, leg_ds, leg_O_l, leg_O_ln, leg_D_l, leg_D_ln, leg_html_inst, leg_polyline, leg_travel_mode)" 
        "VALUES (%(request_no)s, %(leg_dt)s, %(leg_ds)s, %(leg_O_l)s, %(leg_O_ln)s, %(leg_D_l)s, %(leg_D_ln)s, %(leg_html_inst)s, %(leg_polyline)s, %(leg_travel_mode)s)")
    error_messages = []
    for result in results:
        if result["status"] == "OK":
            for leg in result['routes'][0]['legs']:
                try: 
                    params = {
                    "_sent_time_stamp": leg['_sent_time_stamp'],
                    "dt": leg['dt']['value'],
                    "ds": leg['ds']['value'],
                    "dtf": leg['dtf']['value'],
                    "O_l": leg['start_location']['lat'],
                    "O_ln": leg['start_location']['lng'],
                    "O_Ls": leg['O_Ls'],
                    "O_a": leg['start_address'],
                    "D_l": leg['end_location']['lat'],
                    "D_ln": leg['end_location']['lng'],
                    "d_a": leg['end_address']
                    }
                    cursor.execute(add_overall_data, params)
                    query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                    O_l = leg['start_location']['lat']
                    O_ln = leg['start_location']['lng']
                    D_l = leg['end_location']['lat']
                    D_ln = leg['end_location']['lng']
                    _sent_time_stamp = leg['_sent_time_stamp']
                    cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                    request_no = cursor.fetchone()[0]
                except KeyError, e:
                    error_messages.append(e)
                    params = {
                    "_sent_time_stamp": leg['_sent_time_stamp'],
                    "dt": leg['dt']['value'],
                    "ds": leg['ds']['value'],
                    "dtf": "000",
                    "O_l": leg['start_location']['lat'],
                    "O_ln": leg['start_location']['lng'],
                    "O_Ls": leg['O_Ls'],
                    "O_a": 'unknown',
                    "D_l": leg['end_location']['lat'],
                    "D_ln": leg['end_location']['lng'],
                    "d_a": 'unknown'
                    }
                    cursor.execute(add_overall_data, params)
                    query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                    O_l = leg['start_location']['lat']
                    O_ln = leg['start_location']['lng']
                    D_l = leg['end_location']['lat']
                    D_ln = leg['end_location']['lng']
                    _sent_time_stamp = leg['_sent_time_stamp']
                    cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                    request_no = cursor.fetchone()[0]
            for overview_polyline in result['routes']:
                params = {
                "request_no": request_no,
                "Overview_polyline": overview_polyline['overview_polyline']['points']
                }
                cursor.execute(add_polyline, params)
                query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                O_l = leg['start_location']['lat']
                O_ln = leg['start_location']['lng']
                D_l = leg['end_location']['lat']
                D_ln = leg['end_location']['lng']
                _sent_time_stamp = leg['_sent_time_stamp']
                cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                request_no = cursor.fetchone()[0]
            for summary in result['routes']:
                params = {
                "request_no": request_no,
                "summary": summary['summary']
                }
                cursor.execute(add_summary, params)
                query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                O_l = leg['start_location']['lat']
                O_ln = leg['start_location']['lng']
                D_l = leg['end_location']['lat']
                D_ln = leg['end_location']['lng']
                _sent_time_stamp = leg['_sent_time_stamp']
                cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                request_no = cursor.fetchone()[0]
            for warnings in result['routes']:
                params = {
                "request_no": request_no,
                "warnings": str(warnings['warnings'])
                }
                cursor.execute(add_warnings, params)
                query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                O_l = leg['start_location']['lat']
                O_ln = leg['start_location']['lng']
                D_l = leg['end_location']['lat']
                D_ln = leg['end_location']['lng']
                _sent_time_stamp = leg['_sent_time_stamp']
                cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                request_no = cursor.fetchone()[0]
            for waypoint_order in result['routes']:
                params = {
                "request_no": request_no,
                "waypoint_order": str(waypoint_order['waypoint_order'])
                }
                cursor.execute(add_waypoint_order, params)
                query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                O_l = leg['start_location']['lat']
                O_ln = leg['start_location']['lng']
                D_l = leg['end_location']['lat']
                D_ln = leg['end_location']['lng']
                _sent_time_stamp = leg['_sent_time_stamp']
                cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                request_no = cursor.fetchone()[0]
            for steps in result['routes'][0]['legs'][0]['steps']:
                params = {
                "request_no": request_no,
                "leg_dt": steps['dt']['value'],
                "leg_ds": steps['ds']['value'],
                "leg_O_l": steps['start_location']['lat'],
                "leg_O_ln": steps['start_location']['lng'],
                "leg_D_l": steps['end_location']['lat'],
                "leg_D_ln": steps['end_location']['lng'],
                "leg_html_inst": steps['html_instructions'],
                "leg_polyline": steps['polyline']['points'],
                "leg_travel_mode": steps['travel_mode']
                }
                cursor.execute(add_leg_data, params)
        cnx.commit()
    print('error messages:', error_messages)
    cursor.close()
    cnx.close()
    print('finished' + file)

Using htop on the Linux Instance I can see the following:

在Linux实例上使用htop,我可以看到以下内容:

Regarding the MySQL database, using MySQL Workbench I can see that:

关于MySQL数据库,使用MySQL Workbench可以看到:

This python script has been chugging away for days but I have only inserted around 20% of the data to MySQL.

这个python脚本已经运行了好几天,但我只向MySQL插入了大约20%的数据。

My questions - how can I identify the bottleneck? Is it the Python script? It appears to be using a low amount of memory - can I increase this? I have checked the InnoDB buffer pool size as per (How to improve the speed of InnoDB writes per second of MySQL DB) and found it to be large:

我的问题是——如何识别瓶颈?是Python脚本吗?它似乎使用了少量的内存——我可以增加这个吗?我检查了InnoDB缓冲池的大小(如何提高InnoDB每秒写MySQL DB的速度),发现它很大:

SELECT @@innodb_buffer_pool_size;
+---------------------------+
| @@innodb_buffer_pool_size |
+---------------------------+
|               11674845184 |
+---------------------------+

Since I am using an RDS and EC2 instance in the same region I don't believe there is a network bottleneck. Pointers on where I should look for the biggest savings would be very welcome!

由于我在同一地区使用RDS和EC2实例,我不认为存在网络瓶颈。关于我应该在哪里寻找最大的节省将是非常欢迎的!

EDIT

编辑

I think I may have stumbled upon the issue. For efficiency during parsing I am writing each level of JSON separately. However, I then have to execute a query to match a nested part of JSON with its higher level. This query has a low overhead when using small databases. Ive noticed that the speed of the inserts has decreased dramatically on this db. This is because it is having to search a larger and ever growing db to properly connect the JSON data.

我想我可能偶然发现了这个问题。为了提高解析效率,我将分别编写每个级别的JSON。但是,我必须执行一个查询,以便将JSON的嵌套部分与其更高级别相匹配。当使用小型数据库时,该查询的开销很低。我注意到在这个db上插入的速度急剧下降。这是因为它必须搜索更大且不断增长的db才能正确地连接JSON数据。

I am not sure how to solve this other than waiting it out....

我不知道如何解决这个除了等待它....

3 个解决方案

#1


1  

I can not see any table definitions in the Python script .... But when we try and do large Data Operations - we would always disable any Database Indexes when loading to MySQL - also if you have any constraints/Foreign Key enforcement - this should be disabled when you are loading also.

我看不出任何表定义Python脚本....但是,当我们尝试进行大型数据操作时——在加载到MySQL时,我们总是会禁用任何数据库索引——如果您有任何约束或外键执行,那么在加载时也应该禁用这些索引。

Autocommit is disabled by default when connecting through Connector/Python.

当通过连接器/Python连接时,默认情况下自动提交被禁用。

But I can not see any commit - options in the code you present

但是我在您提供的代码中看不到任何提交选项

To Summarise

来总结

Disable/Remove (For Loading)

禁用/删除(加载)

-- Indexes
-- Constraints -- Foreign Keys -- Triggers

——索引——约束——外键——触发器

In your Loading Program

在加载程序

-- Disable autocommit -- commit ever n records (N will depend upon your Buffer size available)

——禁用自动提交——提交n条记录(n取决于可用的缓冲区大小)

#2


1  

my englist is poor

我的englist不好

if i do this work, i will

如果我做这项工作,我会的

  1. use python convert json to txt

    使用python将json转换为txt。

  2. use mysq imp tool , import txt to mysql

    使用mysq imp工具,将txt导入mysql

if you must do python+mysql allinone ,i suggest use

如果您必须使用python+mysql allinone,我建议您使用

insert table values(1),value(2)...value(xxx)  

why 'SELECT request_no FROM master'multiple occurrence, should be read from json

为什么要从json中读取“从master中选择request_no”

my englist is very poor.so..

我的英语很差。

#3


0  

Given this information, it looks like both the script and the DB are mostly idle. Tweaking anything at the MySQL level would be premature.

有了这些信息,看起来脚本和DB几乎都是空闲的。调整MySQL级别还为时过早。

You need more visibility into what your program is doing.

您需要更多地了解您的程序正在做什么。

Start by logging how much time each of your queries takes, how many errors you get and so on.

首先记录每个查询占用的时间,得到的错误数量,等等。

These SELECTs may need adding an index to perform well, if it's a problem at all.

如果这是一个问题的话,这些选择可能需要添加一个索引才能很好地执行。


分享到: