import time import pymysql import os from elasticsearch import Elasticsearch from elasticsearch import helpers host = 'http://192.168.45.128:9200' es = Elasticsearch(host) conn = pymysql.connect(host="localhost", user="root",password="root", database="tp6_gov_cn", charset='utf8mb4') cs = conn.cursor(cursor=pymysql.cursors.DictCursor) database = 'wzyq_admin_menus' #数据库名称和ES索引名字一样 find = '*' #导入指定索引或数据库同步字段名称 limit = '20000000' #每次导入数量 def sync_main(): t1 = time.time() # sel = f'select {find} from {database} where qy <> 2 order by id limit {limit}' t = cs.execute(sel) conn.commit() if not t: print('----------------------------') print(database, '数据库内暂无同步数据') print('----------------------------') return False lists = cs.fetchall() action = [] ids = [] for item in lists: id = item.pop("id") # 删除不要的字段 data = item ids.append(id) aa = { "_index": database, "_id": id, "_source": data } action.append(aa) try: bulk_re = helpers.bulk(es, action) except Exception as e: print('批量操作:', e, file='err') return False if bulk_re[1]: print('批量操作:', bulk_re, file='err') print('批量操作:', ids, file='err') return False # 修改mysql表状态 sql = f'update {database} set qy = 2 where id in ({",".join(map(str, ids))})' cs.execute(sql) conn.commit() t2 = time.time() print(database,'数据库导入耗时:',t2-t1,'总导入条数:',t) return True if __name__ == '__main__': sync_main()
python 批量mysql数据导入Elasticsearch 200W条数据 1分钟导入
唯一是mysql 与ES 数据库字段及数据库名称必须一直