暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

python实现jenkins凭据录入

zayki 2024-04-30
365

啥也不唠了,直接上代码,瑕疵之处望指教。

# -*- coding: utf-8 -*- import json import os import socket import threading import time import pandas as pd import paramiko import requests import base64 as b64 from PyQt5 import QtCore, QtGui, QtWidgets from PyQt5.QtCore import Qt, QRectF, QDir from PyQt5.QtGui import QIcon, QPainter, QPainterPath, QBrush, QColor, QCursor from PyQt5.QtWidgets import QWidget, QApplication, QMessageBox, QFileDialog import sys if os.path.isfile('init.ini'): with open("./init.ini", "r") as file: initinfo = json.load(file) jenkins_url = initinfo['url'] else: jenkins_url = "http://10.10.10.70:8080" def exitapp(): sys.exit(0) button_style = ''' QPushButton {text-align : center; background-color : white; font: bold; border-color: gray; border-width: 1px; border-radius: 3px; padding: 2px; height : 14px; border-style: outset; font : 12px;} QPushButton:hover {text-align : center; background-color : cyan; font: bold; border-color: gray; border-width: 1px; border-radius: 5px; padding: 1px; height : 14px; border-style: outset; font : 12px;} QPushButton:pressed {text-align : center; background-color : cyan; font: bold; border-color: gray; border-width: 1px; border-radius: 5px; padding: 1px; height : 14px; border-style: outset; font : 12px;} ''' class RoundShadow(QWidget): """圆角边框类""" def __init__(self, parent=None): super(RoundShadow, self).__init__(parent) self.border_width = 8 # 设置 窗口无边框和背景透明 *必须 self.setWindowOpacity(0.9) # 设置窗口透明度 self.setAttribute(Qt.WA_TranslucentBackground) self.setWindowFlags(QtCore.Qt.FramelessWindowHint) def paintEvent(self, event): # 阴影 path = QPainterPath() path.setFillRule(Qt.WindingFill) pat = QPainter(self) pat.setRenderHint(pat.Antialiasing) pat.fillPath(path, QBrush(Qt.white)) color = QColor(192, 192, 192, 50) for i in range(10): i_path = QPainterPath() i_path.setFillRule(Qt.WindingFill) ref = QRectF(10 - i, 10 - i, self.width() - (10 - i) * 2, self.height() - (10 - i) * 2) # i_path.addRect(ref) i_path.addRoundedRect(ref, self.border_width, self.border_width) color.setAlpha(int(150 - i ** 0.5 * 50)) pat.setPen(color) pat.drawPath(i_path) # 圆角 pat2 = QPainter(self) pat2.setRenderHint(pat2.Antialiasing) # 抗锯齿 pat2.setBrush(Qt.cyan) pat2.setPen(Qt.transparent) rect = self.rect() rect.setLeft(-1) rect.setTop(-1) rect.setWidth(rect.width() - 1) rect.setHeight(rect.height() - 1) pat2.drawRoundedRect(rect, 8, 8) def mousePressEvent(self, event): if event.button() == Qt.LeftButton: self.m_flag = True self.m_Position = event.globalPos() - self.pos() # 获取鼠标相对窗口的位置 event.accept() self.setCursor(QCursor(Qt.OpenHandCursor)) # 更改鼠标图标 def mouseMoveEvent(self, QMouseEvent): try: if Qt.LeftButton and self.m_flag: self.move(QMouseEvent.globalPos() - self.m_Position) # 更改窗口位置 QMouseEvent.accept() except: pass def mouseReleaseEvent(self, QMouseEvent): self.m_flag = False self.setCursor(QCursor(Qt.ArrowCursor)) class jenkins_credentials: host = jenkins_url def updateEOSCredentials(self, host, userName, passworld, id, des, jenkinsUserName, jenkinsPassworld): host = host.strip() userName = userName.strip() url = host + "/credentials/store/system/domain/远程服务器凭据/credential/" + id + "/updateSubmit" json = {"stapler-class": "com.cloudbees.plugins.credentials.impl.UsernamePasswordCredentialsImpl", "scope": "GLOBAL", "username": userName, "password": '''{0}'''.format(passworld), "$redact": "password", "id": id, "description": des} data = {} data["json"] = str(json) try: result = requests.post(url, data=data, auth=(jenkinsUserName, jenkinsPassworld), timeout=3) return result.status_code except: Ui_ImportCreds.messageBox(self, "INFO", "Connection timeout!") # 传入值请使用双引号 def createEOSCredentials(self, host, domain_name, userName, passworld, id, des, jenkinsUserName, jenkinsPassworld): host = host.strip() userName = userName.strip() domain_name = '远程服务器凭据' url = host + "/credentials/store/system/domain/" + domain_name + "/createCredentials" json = {"": "0", "credentials": {"scope": "GLOBAL", "username": userName, "password": '''{0}'''.format( passworld), "$redact": "password", "id": id, "description": des, "stapler-class": "com.cloudbees.plugins.credentials.impl.UsernamePasswordCredentialsImpl", "$class": "com.cloudbees.plugins.credentials.impl.UsernamePasswordCredentialsImpl"}} data = {} data["json"] = str(json) try: result = requests.post(url, data=data, auth=(jenkinsUserName, jenkinsPassworld), timeout=3) return result.status_code except: Ui_ImportCreds.messageBox(self, "INFO", "Connection timeout!") def deleteEOSCredentials(self, host, domain_name, id, jenkinsUserName, jenkinsPassworld): host = host.strip() domain_name = '远程服务器凭据' url = host + "/credentials/store/system/domain/" + domain_name + "/credential/" + id + "/doDelete" try: result = requests.post(url, auth=(jenkinsUserName, jenkinsPassworld), timeout=5) return result.status_code except: Ui_ImportCreds.messageBox(self, "INFO", "Connection timeout!") def searchEOSCredentials(self, host, domain_name, id, jenkinsUserName, jenkinsPassworld): host = host.strip() domain_name = '远程服务器凭据' url = host + "/credentials/store/system/domain/" + domain_name + "/credential/" + id + "/" try: response = requests.post(url, auth=(jenkinsUserName, jenkinsPassworld), timeout=3) status_code = response.status_code if status_code == 200: return True else: return False except: Ui_ImportCreds.messageBox(self, "INFO", "Connection timeout!") class SshHostConfiguration: def __init__(self): with open("init.ini", "r") as file: initinfo = json.load(file) self.url = initinfo['url'] self.jenkinsUserName = initinfo['username'] self.jenkinsPassword = Ui_jenkinslogin.xor_decrypt(self, initinfo['password'], "superzyj") self.url = jenkins_url self.script_url = jenkins_url + '/script' self.headers = { 'Host': self.url, 'Content-Length': '1380', 'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1', 'Origin': self.url, 'Content-Type': 'application/x-www-form-urlencoded', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'Referer': self.script_url, 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'close', } def addHostConfiguration(self, Name, Hostname, Username, Remotedirectory, Password): Hostname = Hostname.strip() Username = Username.strip() addhost_script_commands = ''' import jenkins.model.* import jenkins.plugins.publish_over_ssh.BapSshHostConfiguration def inst = Jenkins.getInstance() def publish_ssh = inst.getDescriptor("jenkins.plugins.publish_over_ssh.BapSshPublisherPlugin") def configuration = new BapSshHostConfiguration("''' + Name + '''","''' + Hostname + '''", "''' + Username + '''",\'''' + '''{}'''.format(Password) + '''\',"''' + Remotedirectory + '''",22, 300000,true,'','',false) publish_ssh.addHostConfiguration(configuration) publish_ssh.save() ''' data = { "script": addhost_script_commands } response = requests.post(self.script_url, headers=self.headers, auth=(self.jenkinsUserName, self.jenkinsPassword), data=data, verify=False) return response.text def removeHostConfiguration(self, Name): removehost_script_commands = ''' import jenkins.model.* import jenkins.plugins.publish_over_ssh.BapSshHostConfiguration def inst = Jenkins.getInstance() def publish_ssh = inst.getDescriptor("jenkins.plugins.publish_over_ssh.BapSshPublisherPlugin") publish_ssh.removeHostConfiguration("''' + Name + '''") publish_ssh.save() ''' data = { "script": removehost_script_commands } try: requests.post(self.script_url, headers=self.headers, auth=(self.jenkinsUserName, self.jenkinsPassword), data=data, verify=False) except: Ui_ImportCreds.messageBox(self, "INFO", "Connection timeout!") def getHostConfigurations(self): gethost_script_commands = ''' import jenkins.model.* import jenkins.plugins.publish_over_ssh.BapSshHostConfiguration def inst = Jenkins.getInstance() def publish_ssh = inst.getDescriptor("jenkins.plugins.publish_over_ssh.BapSshPublisherPlugin") publish_ssh.getHostConfigurations() ''' data = { "script": gethost_script_commands } response = requests.post(self.script_url, headers=self.headers, auth=(self.jenkinsUserName, self.jenkinsPassword), data=data, verify=False) return response.text class Ui_jenkinslogin(RoundShadow, QWidget): def __init__(self): super(Ui_jenkinslogin, self).__init__() self.setupUi() def setupUi(self): self.setObjectName("jenkinslogin") # self.setWindowIcon(QIcon('resources/jenkinslogin.ico')) self.setWindowModality(QtCore.Qt.WindowModal) self.setFixedSize(240, 150) self.label = QtWidgets.QLabel(self) self.label.setGeometry(QtCore.QRect(35, 31, 80, 16)) font = QtGui.QFont() font.setFamily("微软雅黑") font.setPointSize(9) self.label.setFont(font) self.label.setObjectName("label") self.label_2 = QtWidgets.QLabel(self) self.label_2.setGeometry(QtCore.QRect(35, 71, 80, 16)) font = QtGui.QFont() font.setFamily("微软雅黑") font.setPointSize(9) self.label_2.setFont(font) self.label_2.setObjectName("label_2") self.lineEdit = QtWidgets.QLineEdit(self) self.lineEdit.setGeometry(QtCore.QRect(71, 31, 133, 20)) self.lineEdit.setObjectName("lineEdit") self.lineEdit_2 = QtWidgets.QLineEdit(self) self.lineEdit_2.setGeometry(QtCore.QRect(71, 71, 133, 20)) self.lineEdit_2.setObjectName("lineEdit_2") self.lineEdit_2.setEchoMode(QtWidgets.QLineEdit.Password) self.pushButton = QtWidgets.QPushButton(self) self.pushButton.setGeometry(QtCore.QRect(31, 111, 75, 23)) font = QtGui.QFont() font.setFamily("微软雅黑") font.setPointSize(10) font.setBold(False) font.setWeight(50) self.pushButton.setFont(font) self.pushButton.setObjectName("pushButton") self.pushButton.setStyleSheet(button_style) self.pushButton_2 = QtWidgets.QPushButton(self) self.pushButton_2.setGeometry(QtCore.QRect(125, 111, 75, 23)) font = QtGui.QFont() font.setFamily("微软雅黑") font.setPointSize(10) self.pushButton_2.setFont(font) self.pushButton_2.setObjectName("pushButton_2") self.pushButton_2.setStyleSheet(button_style) self.retranslateUi(self) QtCore.QMetaObject.connectSlotsByName(self) self.url = jenkins_url if os.path.isfile("./init.ini"): with open("./init.ini", "r") as file: self.initinfo = json.load(file) self.url = self.initinfo['url'] username = self.initinfo['username'] password = self.xor_decrypt(self.initinfo['password'], "superzyj") self.lineEdit.setText(username) self.lineEdit_2.setText(password) def retranslateUi(self, Ui_jenkinslogin): _translate = QtCore.QCoreApplication.translate Ui_jenkinslogin.setWindowTitle(_translate("jenkinslogin", "登录")) Ui_jenkinslogin.label.setText(_translate("jenkinslogin", "用户:")) Ui_jenkinslogin.label_2.setText(_translate("jenkinslogin", "口令:")) Ui_jenkinslogin.pushButton.setText(_translate("jenkinslogin", "登录")) Ui_jenkinslogin.pushButton_2.setText(_translate("jenkinslogin", "退出")) self.pushButton.clicked.connect(self.check_login) self.pushButton_2.clicked.connect(exitapp) def xor_encrypt(self, tips, key): self.tips = tips self.key = key ltips = len(tips) lkey = len(key) secret = [] num = 0 for each in tips: if num >= lkey: num = num % lkey secret.append(chr(ord(each) ^ ord(key[num]))) num += 1 return b64.b64encode("".join(secret).encode()).decode() def xor_decrypt(self, secret, key): tips = b64.b64decode(secret.encode()).decode() self.secret = secret self.key = key ltips = len(tips) lkey = len(key) secret = [] num = 0 for each in tips: if num >= lkey: num = num % lkey secret.append(chr(ord(each) ^ ord(key[num]))) num += 1 return "".join(secret) def secwindow(self): self.hide() # 隐藏此窗口 self.f = Ui_ImportCreds() # 将第一个窗口换个名字 self.f.show() # 将第一个窗口显示出来 def messageBox(self, title, text): messagebox = QMessageBox() messagebox.setWindowIcon(QIcon('resources/jenkins.ico')) messagebox.setWindowTitle(title) messagebox.setStyleSheet(button_style) messagebox.setText(text) messagebox.addButton(QtWidgets.QPushButton('确定'), QMessageBox.YesRole) messagebox.exec_() def check_login(self): jenkinsUserName = self.lineEdit.text() jenkinsPassword = self.lineEdit_2.text() if jenkinsUserName == "" or jenkinsPassword == "": self.messageBox("INFO", "Username and password cannot be empty!") else: try: response = requests.post(self.url, auth=(jenkinsUserName, jenkinsPassword), timeout=3) result = response.status_code # print(result) if result == 200: initinfo = {"url": self.url, "username": jenkinsUserName, "password": self.xor_encrypt(jenkinsPassword, "superzyj")} with open("./init.ini", "w") as file: file.write(json.dumps(initinfo)) self.secwindow() else: self.messageBox("INFO", "Login failed,username or password is incorrect!") except: Ui_jenkinslogin.messageBox(self, "INFO", "Connect timeout!") class Ui_ImportCreds(RoundShadow, QWidget): def __init__(self): super(Ui_ImportCreds, self).__init__() self.setupUi() with open("./init.ini", "r") as file: initinfo = json.load(file) self.url = initinfo['url'] self.jenkinsUserName = initinfo['username'] self.jenkinsPassword = Ui_jenkinslogin.xor_decrypt(self, initinfo['password'], "superzyj") self.domain_name = '远程服务器凭据' # self.lineEdit.setPlainText("拖动华为云密码表到此处即可导入!") self.textBrowser.setText("使用说明:\n1.选择导入按钮或者拖动文件到输入框即可导入文件路径.\n" + "2.勾选复选框可以导入凭据的同时配置SSH授权。\n" + "3.配置文件init.ini中包含登录的url地址及用户名、密码,可以自己定义。\n") def setupUi(self): self.setObjectName("ImportCreds") self.resize(648, 327) self.toolButton = QtWidgets.QToolButton(self) self.toolButton.setGeometry(QtCore.QRect(10, 40, 41, 21)) self.toolButton.setObjectName("toolButton") self.toolButton.setStyleSheet(''' QToolButton {text-align : center; background-color : white; font: bold; border-color: gray; border-width: 1px; border-radius: 3px; padding: 2px; height : 14px; border-style: outset; font : 12px;} QToolButton:hover {text-align : center; background-color : cyan; font: bold; border-color: gray; border-width: 1px; border-radius: 5px; padding: 1px; height : 14px; border-style: outset; font : 12px;} QToolButton:pressed {text-align : center; background-color : cyan; font: bold; border-color: gray; border-width: 1px; border-radius: 5px; padding: 1px; height : 14px; border-style: outset; font : 12px;} ''' ) self.textBrowser = QtWidgets.QTextBrowser(self) self.textBrowser.setGeometry(QtCore.QRect(10, 70, 631, 251)) self.textBrowser.setObjectName("textBrowser") self.lineEdit = QtWidgets.QTextEdit(self) self.lineEdit.setGeometry(QtCore.QRect(60, 40, 491, 20)) self.lineEdit.setObjectName("lineEdit") self.CheckBox = QtWidgets.QCheckBox(self) self.CheckBox.setGeometry(QtCore.QRect(560, 40, 21, 21)) self.CheckBox.setObjectName("CheckBox") self.CheckBox.setCheckState(True) self.pushButton = QtWidgets.QPushButton(self) self.pushButton.setGeometry(QtCore.QRect(600, 40, 41, 21)) self.pushButton.setObjectName("pushButton") self.pushButton.setStyleSheet(button_style) self.pushButton_2 = QtWidgets.QPushButton(self) self.pushButton_2.setGeometry(QtCore.QRect(626, 1, 20, 20)) self.pushButton_2.setObjectName("pushButton_2") self.pushButton_2.setStyleSheet(button_style) self.label = QtWidgets.QLabel(self) self.label.setGeometry(QtCore.QRect(180, 10, 251, 16)) font = QtGui.QFont() font.setFamily("微软雅黑") font.setPointSize(12) font.setBold(True) font.setWeight(75) self.label.setFont(font) self.label.setToolTipDuration(1) self.label.setObjectName("label") self.retranslateUi(self) QtCore.QMetaObject.connectSlotsByName(self) self.pushButton_2.clicked.connect(exitapp) self.lineEdit.textChanged.connect(self.choosepath) self.toolButton.clicked.connect(self.button_click) self.pushButton.clicked.connect(self.exec) def retranslateUi(self, ImportCreds): _translate = QtCore.QCoreApplication.translate ImportCreds.setWindowTitle(_translate("ImportCreds", "ImportCreds")) self.toolButton.setText(_translate("ImportCreds", "导入")) self.pushButton.setText(_translate("ImportCreds", "执行")) self.pushButton_2.setText(_translate("ImportCreds", "X")) self.label.setText(_translate("ImportCreds", "jenkins批量凭据及SSH授权配置工具")) def checkIP(self, IP): try: strIP = str(IP) socket.inet_aton(strIP) return True except socket.error: return False def checkSSH(self, ip, port, user, passwd): ssh = paramiko.SSHClient() port = int(port) ip = str(ip) passwd = str(passwd) ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) print(ip + str(port) + user + passwd) try: ssh.connect(ip, port, user, passwd, timeout=0.5) return True except Exception as e: return False def choosepath(self): if 0 == self.lineEdit.toPlainText().find('file:///'): self.lineEdit.setText(self.lineEdit.toPlainText().replace('file:///', '')) def button_click(self): # os.chdir(os.getenv('USERPROFILE') + "\\Desktop") # absolute_path is a QString object absolute_path = QFileDialog.getOpenFileName(self, "Open file", os.getenv('USERPROFILE') + "\\Desktop", "Excel files (*.xlsx)") self.lineEdit.setText(absolute_path[0]) def messageBox(self, title, text): messagebox = QMessageBox() messagebox.setWindowIcon(QIcon('resources/jenkins.ico')) messagebox.setWindowTitle(title) messagebox.setStyleSheet(button_style) messagebox.setText(text) messagebox.addButton(QtWidgets.QPushButton('确定'), QMessageBox.YesRole) messagebox.exec_() def exec(self): self.textBrowser.clear() self.textBrowser.append("开始执行.....................") filename = self.lineEdit.toPlainText() df = pd.read_excel(filename) def exec_(): for i in range(0, len(df)): ip = str(df.iloc[i][0]).strip() username = str(df.iloc[i][1]).strip() password = str(df.iloc[i][6]).strip() # if self.checkIP(ip) == False: # ip = str(df.iloc[i - 1][1]) # port = int(df.iloc[i - 1][5]) # else: # port = int(df.iloc[i][5]) id = des = ip if self.checkIP(ip) == True: # # if self.checkSSH(ip, port, username, password) == True: if jenkins_credentials.searchEOSCredentials(self, self.url, self.domain_name, id, self.jenkinsUserName, self.jenkinsPassword): jenkins_credentials.updateEOSCredentials(self, self.url, username, password, id, des, self.jenkinsUserName, self.jenkinsPassword) self.textBrowser.append("ip = {0};username={1},password={2}".format(ip, username, password)) self.textBrowser.append('{0}的凭据已经存在,执行更新密码操作。'.format(ip)) self.textBrowser.moveCursor(self.textBrowser.textCursor().End) else: jenkins_credentials.createEOSCredentials(self, self.url, self.domain_name, username, password, id, des, self.jenkinsUserName, self.jenkinsPassword) self.textBrowser.append("ip = {0};username={1},password={2}".format(ip, username, password)) self.textBrowser.append('完成{0}的凭据的添加操作。'.format(ip)) self.textBrowser.moveCursor(self.textBrowser.textCursor().End) if ip in SshHostConfiguration().getHostConfigurations(): SshHostConfiguration().removeHostConfiguration("{0}@{1}".format(username, ip)) SshHostConfiguration().addHostConfiguration("{0}@{1}".format(username, ip), ip, username, '/home/user/deployment', password) self.textBrowser.append('{0}的SSH授权配置已经存在,执行更新配置操作。'.format(ip)) else: SshHostConfiguration().addHostConfiguration("{0}@{1}".format(username, ip), ip, username, '/home/user/deployment', password) self.textBrowser.append('完成{0}的SSH授权的添加操作。'.format(ip)) # else: # self.textBrowser.append( # "<font color='green'>SSH登录失败,请检查{0}的口令或者用户名是否正确</font>".format(ip)) # self.textBrowser.moveCursor(self.textBrowser.textCursor().End) else: self.textBrowser.append("{0}不是一个有效的ip地址,请检查导出结果.".format(ip)) self.textBrowser.moveCursor(self.textBrowser.textCursor().End) self.textBrowser.append("执行完毕.....................") threading.Thread(target=exec_, daemon=True).start() if __name__ == "__main__": app = QApplication(sys.argv) form = QWidget() w = Ui_jenkinslogin() w.show() sys.exit(app.exec_())
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论