import time
import pymysql
import os
from elasticsearch import Elasticsearch
from elasticsearch import helpers


host = 'http://192.168.45.128:9200'
es = Elasticsearch(host)
conn = pymysql.connect(host="localhost", user="root",password="root", database="tp6_gov_cn", charset='utf8mb4')
cs = conn.cursor(cursor=pymysql.cursors.DictCursor)

database = 'wzyq_admin_menus'  #数据库名称和ES索引名字一样
find = '*'  #导入指定索引或数据库同步字段名称
limit = '20000000' #每次导入数量  


def sync_main():

    t1 = time.time()
    #
    sel = f'select {find} from {database} where qy <> 2  order by id limit {limit}'
    t = cs.execute(sel)
    conn.commit()
    if not t:
        print('----------------------------')
        print(database, '数据库内暂无同步数据')
        print('----------------------------')
        return False
    lists = cs.fetchall()
    action = []
    ids = []
    for item in lists:
        id = item.pop("id")
        # 删除不要的字段
        data = item
        ids.append(id)
        aa = {
            "_index": database,
            "_id": id,
            "_source": data
        }
        action.append(aa)
    try:
        bulk_re = helpers.bulk(es, action)
    except Exception as e:
        print('批量操作:', e, file='err')
        return False
    if bulk_re[1]:
        print('批量操作:', bulk_re, file='err')
        print('批量操作:', ids, file='err')
        return False
    # 修改mysql表状态
    sql = f'update {database} set qy = 2 where id in ({",".join(map(str, ids))})'
    cs.execute(sql)
    conn.commit()
    t2 = time.time()
    print(database,'数据库导入耗时:',t2-t1,'总导入条数:',t)
    return True

if __name__ == '__main__':
    sync_main()


python 批量mysql数据导入Elasticsearch 200W条数据 1分钟导入

唯一是mysql 与ES 数据库字段及数据库名称必须一直