之前我们记录过使用pymongo模块操作mongodb数据库,但是操作mongodb集群的话连接配置有一些差异。
连接mongodb集群需要使用下面的配置方式连接
client = MongoClient('mongodb://root:123456@mongodb-0.mongodb-headless.namespace:27017,mongodb-1.mongodb-headless.namespace:27017,mongodb-2.mongodb-headless.namespace:27017')
还可以设置读取的路由连接操作,这样连接主要操作设置的mongodb集群节点
client = MongoClient('mongodb://root:123456@mongodb-0.mongodb-headless.namespace:27017,mongodb-1.mongodb-headless.namespace:27017,mongodb-2.mongodb-headless.namespace:27017/?readPreference=secondary')
readPreference 主要控制客户端driver从副本集(Replica Set)读数据的时候如何路由,它可以设置数据读取方式和读取节点,提高数据读取的效率和可靠性。
它有以下几个配置以及释义:
(1)primary:只主(默认模式)。只从primary节点读数据。
(2)primaryPreferred:先主后从。优先从primary读取,primary不可用时从secondary读。
(3)secondary:只从。只从副本集中secondary节点读数据。
(4)secondaryPreferred:先从后主。优先从secondary读取,如果secondary不可用时就从primary读。
(5)nearest:就近。根据网络距离,就近读取,根据客户端与服务端的PingTime是实现。
主要是配置连接上的不通,其它的操作可以查看以前的笔记:https://sulao.cn/post/552.html
下面是我的操作代码
#!/usr/bin/pyhton3 #coding: utf-8 from pymongo import MongoClient import json import math import csv import time import datetime import logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(filename)s[line:%(lineno)d] %(message)s', datefmt='%Y-%m-%d') page_size = 1000 client = MongoClient('mongodb://root:123456@mongodb-0.mongodb-headless.namespace:27017,mongodb-1.mongodb-headless.namespace:27017,mongodb-2.mongodb-headless.namespace:27017') data = client["data"] jobs = data["jobs"] #query = {"duration": {"$lt": 2}} query = {"$and": [{ "duration": {"$lt": 2}}, {"endTime": {"$gt": datetime.datetime(2023, 10, 1, 00, 00, 00)}}]} count = jobs.count_documents(query) total_page = math.ceil(count/page_size) logging.info("Total Documents: {}, Total number of pages: {} !".format(count, total_page)) with open("jobs.csv", "w", newline='') as csvfile: cvwrite = csv.writer(csvfile) cvwrite.writerow(["allocationID", "allocationID", "groupID", "duration", "k8spodname"]) for p in range(1, total_page+1): logging.info("Exporting page {}......".format(p)) start = (p-1)*page_size try: ret = jobs.find(query).sort([("startTime", 1)]).limit(page_size).skip(start) for d in ret: if "full" in d: k8spodname = d["full"]["meta"].get("k8spodname", "") else: k8spodname = "" logging.warning("Not found podname, alloction_id: {}".format(d.get("allocationID", ""))) cvwrite.writerow([d.get("allocationID", ""), d.get("allocationID", ""), d.get("groupID", ""), d.get("duration", ""), k8spodname]) logging.info("Page {} export succeed !!!".format(p)) except Exception as e: logging.error("Page {} export failed , {} ???".format(p, e)) time.sleep(0.5)