使用Pymongo操作MongoDB集群

之前我们记录过使用pymongo模块操作mongodb数据库,但是操作mongodb集群的话连接配置有一些差异。

连接mongodb集群需要使用下面的配置方式连接

client = MongoClient('mongodb://root:123456@mongodb-0.mongodb-headless.namespace:27017,mongodb-1.mongodb-headless.namespace:27017,mongodb-2.mongodb-headless.namespace:27017')

还可以设置读取的路由连接操作,这样连接主要操作设置的mongodb集群节点

client = MongoClient('mongodb://root:123456@mongodb-0.mongodb-headless.namespace:27017,mongodb-1.mongodb-headless.namespace:27017,mongodb-2.mongodb-headless.namespace:27017/?readPreference=secondary')

readPreference 主要控制客户端driver从副本集(Replica Set)读数据的时候如何路由,它可以设置数据读取方式和读取节点,提高数据读取的效率和可靠性。

它有以下几个配置以及释义:
(1)primary:只主(默认模式)。只从primary节点读数据。
(2)primaryPreferred:先主后从。优先从primary读取,primary不可用时从secondary读。
(3)secondary:只从。只从副本集中secondary节点读数据。
(4)secondaryPreferred:先从后主。优先从secondary读取,如果secondary不可用时就从primary读。
(5)nearest:就近。根据网络距离,就近读取,根据客户端与服务端的PingTime是实现。

主要是配置连接上的不通,其它的操作可以查看以前的笔记:https://sulao.cn/post/552.html

下面是我的操作代码

#!/usr/bin/pyhton3
#coding: utf-8
from pymongo import MongoClient
import json
import math
import csv
import time
import datetime
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(filename)s[line:%(lineno)d] %(message)s', datefmt='%Y-%m-%d')

page_size = 1000

client = MongoClient('mongodb://root:123456@mongodb-0.mongodb-headless.namespace:27017,mongodb-1.mongodb-headless.namespace:27017,mongodb-2.mongodb-headless.namespace:27017')
data = client["data"]
jobs = data["jobs"]
#query = {"duration": {"$lt": 2}}
query = {"$and": [{ "duration": {"$lt": 2}}, {"endTime": {"$gt": datetime.datetime(2023, 10, 1, 00, 00, 00)}}]}

count = jobs.count_documents(query)
total_page = math.ceil(count/page_size)
logging.info("Total Documents: {}, Total number of pages: {} !".format(count, total_page))
with open("jobs.csv", "w", newline='') as csvfile:
    cvwrite = csv.writer(csvfile)
    cvwrite.writerow(["allocationID", "allocationID", "groupID", "duration", "k8spodname"])

    for p in range(1, total_page+1):
        logging.info("Exporting page {}......".format(p))
        start = (p-1)*page_size
        try:
            ret = jobs.find(query).sort([("startTime", 1)]).limit(page_size).skip(start)
            for d in ret:
                if "full" in d:
                    k8spodname = d["full"]["meta"].get("k8spodname", "")
                else:
                    k8spodname = ""
                    logging.warning("Not found podname, alloction_id: {}".format(d.get("allocationID", "")))
                cvwrite.writerow([d.get("allocationID", ""), d.get("allocationID", ""), d.get("groupID", ""), d.get("duration", ""), k8spodname])
            logging.info("Page {} export succeed !!!".format(p))
        except Exception as e:
            logging.error("Page {} export failed , {} ???".format(p, e))
        time.sleep(0.5)


内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://sulao.cn/post/930.html