暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Harbor仓库-API清理仓库镜像

小朱哥的技术人生 2020-06-28
1571


初衷

编写此脚本的初衷是因为在使用 harbor 的时候存储上升的很快不久就满了,需要进行清理镜像回收空间,但是在 harbor 的管理平台进行手动清理比较麻烦,工作量也大,并且也好像很low的样子.

在harbor中,清理镜像,也得分为两步,第一步是从ui中删除历史镜像,这个时候镜像并不会被真正删除,第二步清理镜像释放空间

功能

现在说明一下环境:我使用的 harbor 版本是:1.8.1的,其他版本我没进行测试,其它版本可以自行尝试,如api不同自行更改即可

这里需要说明一下程序只能使用镜像上传的时间进行删除!!!

实现的功能:

  • 使用requests的Session保持,直接记录auth无需每次提供auth参数
  • 使用traceback便于脚本出错的问题定位
  • 取消requests的Keep_alive,设置最大连接数以及重试次数
  • 通过 Harbor 提供的 API 获取到主要的信息
  • 可以设置要排除的项目(不对其项目就行删除)
  • 可以通过指定想要删除的时间进行删除
  • 可以指定仓库中保留的镜像数量
  • 删除后清理释放空间
  • 加入了执行时间统计
  • 一些详细信息的打印

需要修改的地方

这些需要修改的地方都在代码下面的main中,如果没有因为 API 版本问题其他代码无需修改

代码中需要修改的地方:

  • Harbor的 API URL
  • Harbor的认证信息(用户名,密码)
  • 需要排除的项目组project(无需排除设为空列表即可)

代码

具体代码如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020/6/23 10:42 上午
# @Author : Brian
# @File : delete_image_harbor.py
# @Software: PyCharm

"""
通过harbor接口删除镜像
功能:
1. 可以指定要过滤的项目
2. 可以指定删除多久之前的镜像
3. 可以对仓库镜像保留指定
"""


import requests
from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
import json
from requests.auth import HTTPBasicAuth
import traceback
import time
import datetime
from tqdm import tqdm


class Harbor(object):
def __init__(self, api_url, user, exclude, beforetime, keepmirror):
"""
:param api_url: harbor的api的url,比如: https://xxxxx.com/api, 在最下面的main里面指定
:param user: 对harbor的认证
:param exclude: 排除项目的列表
:param beforetime: 删除多久的之前的镜像比如: 30
:param keepmirror: 对项目里面仓库的标签镜像保留的个数,比如: 1
"""

self.auth = user
self.url = api_url
self.project_exclude = exclude
self.beforetime = int(beforetime)
self.keepmirror = int(keepmirror)

# 请求头部
self.head = {"user_agent": "Mozilla/5.0"}
# 项目组对应项目组id字典
self.project_state = {}
# 项目id对应仓库数字典
self.project_special = {}
# 仓库对应镜像标签字典
self.tag_state = {}
# 仓库名称和镜像标签对应
self.repo_state = {}
# 删除仓库镜像标签(版本)的个数
self.repo_dispose_count = 0
self.setting()
self.timechange()

def setting(self):
"""
使用 requests的Session保持,直接记录auth无需每次提供auth参数,取消requests的Keep_alive,设置最大连接数以及重试次数为3次
:return:
"""

self.session = requests.Session()
self.session.auth = self.auth
retry = Retry(connect=3, backoff_factor=1)
adapter = HTTPAdapter(max_retries=retry)
self.session.mount("https//", adapter)
self.session.keep_alive = False

def timechange(self):
"""
获取到指定要删除多少天之前的日期
self.beforetime 指定了要删除多少天之前的镜像(使用日期)
:return:
"""

today = datetime.datetime.utcnow()
days_age = today - datetime.timedelta(days=self.beforetime)
self.days_age = str(days_age)

def list_project(self):
"""
使用 traceback 便于脚本出错的问题定位
访问 harbor 项目的接口,对返回的数据 json 解析取出想要的内容
:return:
"""

try:
r_project = self.session.get("{}/projects".format(self.url), headers=self.head)
r_project.raise_for_status()
project_data = json.loads(r_project.text)
for i in project_data:
# 获取项目的名字
project_name = i.get("name")
# 获取到项目的id
project_id = i.get("project_id")
# 获取到项目下仓库的统计
project_repo = i.get("repo_count")
# 将项目的名字和项目ID对应到project_state字典中
self.project_state[project_name] = project_id
# 将项目的ID和项目下仓库的个数对应到project_special字典中
self.project_special[project_id] = project_repo

print("\033[0;32m项目名称:{}\t项目编号:{}\t项目下仓库统计:{}\033[0m".format(project_name, project_id, project_repo))
print("\033[0;36mproject:项目组对应id列表:{}\033[0m".format(self.project_state))
print("\033[0;36mproject:项目id对应仓库数:{}\033[0m".format(self.project_special))
except:
traceback.print_exc()
raise


def list_repo(self):
"""
使用 traceback 便于脚本出错的问题定位
访问 Harbor 的仓库接口,指定了项目的id(获取那个项目下的仓库),指定了页数,设置了每一次接口返回的条数100条
:return:
"""

try:
# pro_name 为获取的项目组名比如: ops
for pro_name in self.project_state.keys():
# 判断 pro_name 项目名称是否在被过滤排除的项目组列表中
if pro_name not in self.project_exclude:
# 得到排除的项目组之外的项目的ID
pro_id = self.project_state.get(pro_name)
# 由于请求限制,得出需请求的次数,整除+1 (一页100个多了翻页)
number = self.project_special.get(pro_id) 100 + 1
for i in range(number):
page = i + 1
r_repo = self.session.get(
"{}/repositories?project_id={}&page={}&page_size=100".format(self.url, pro_id, page,
headers=self.head))
repo_data = json.loads(r_repo.text)
for r in repo_data:
# 获取到仓库的 ID
repo_id = r.get("id")
# 获取到仓库的名字
repo_name = r.get("name")
# 获取仓库里镜像的标签数
tag_count = r.get("tags_count")
# 将仓库名称和镜像标签数对应到一个字典
self.repo_state[repo_name] = tag_count
print("\033[0;31mrepo:排除部分项目组后,需处理的仓库总量为:{}\033[0m".format(len(self.repo_state)))
except:
traceback.print_exc()
raise

def list_tag(self):
"""
使用 traceback 便于脚本出错的问题定位
访问 Harbor 仓库标签的接口,获取仓库镜像标签列表
:return:
"""

try:
# 这里得到的n是仓库的名字
for n in self.repo_state.keys():
# print(n)
# 访问接口
r_tag = self.session.get('{}/repositories/{}/tags'.format(self.url, n, headers=self.head))
# 得到数据
tag_data = json.loads(r_tag.text)
# 存放时间对比后的标签的名字和日期
tag_dict = {}
# 存放日期对比后在进行镜像保留对比后的标签时间
tagtime_list = []
# 存放日期对比后在进行镜像保留对比后的标签名字
tagname_list = []

# 这里的t是要接口返回的数据
for t in tag_data:
# 获取到接口返回数据的日期
tag_time = t.get("created").split('T')[0]
# 判断日期是否小于指定要删除日期的时间
if tag_time < self.days_age:
tag_name = t.get("name")
tag_dict[tag_time] = tag_name
# 判断现在的镜像标签的个数是否和指定的一样
if len(tag_dict) > self.keepmirror:
tagtime_list.append(tag_time)
tagname_list.append(tag_name)
# print(n, "---->", tag_time, "---->", tag_name)
self.tag_state[n] = tagname_list
# 打印获取到的仓库名称和标签列表
print(self.tag_state)
self.repo_dispose_count += len(tagname_list)
print("\033[0;31mtag:对需处理的仓库进一步过滤现需处理涉及仓库共:{}个,涉及删除镜像版本共:{}个\033[0m".format(len(self.tag_state),
self.repo_dispose_count))
except:
traceback.print_exc()
raise

def del_tag(self):
"""
使用 traceback 便于脚本出错的问题定位
删除镜像标签,
:return:
"""

try:
# 定义删除的总数为了下面的统计
delete_total = 0
# 定义删除失败的列表
del_faild = []
# 判断是否有要删除的镜像
if self.repo_dispose_count == 0:
print("\033[0;34mdel:本次无需删除tag\033[0m")
else:
print("\033[0;34mdel:删除tag阶段耗时较长:请耐心等待\033[0m")
# 进度条设置
pbar1 = tqdm(total=self.repo_dispose_count, unit="个", unit_scale=True)

# 这里的na是仓库的名字
for na in self.tag_state:
# 这里的 ta 是镜像标签的名字
for ta in self.tag_state[na]:
try:
# 通过delete方式访问删除接口
r_del = self.session.delete(
"{}/repositories/{}/tags/{}".format(self.url, na, ta, headers=self.head, auth=self.auth))
r_del.raise_for_status()
# 删除一个加1一次
delete_total += 1
# 进度条更新
pbar1.update(1)
except:
print('del: {}仓库下删除版本号:{}失败!!!'.format(na, ta))
del_faild.append(na + ':' + ta)
time.sleep(3)
pbar1.close()
print("\033[0;34mdel:需删除镜像已完成删除!共删除版本数量:{}个\033[0m".format(delete_total))
print('删除失败共计:{},删除失败的为:{}'.format(len(del_faild), del_faild))
except:
traceback.print_exc()
raise


def volume_recycle(self):
"""
使用 traceback 便于脚本出错的问题定位
在harbor中,清理镜像,也得分为两步,第一步是从ui中删除历史镜像。这个时候镜像并不会被真正删除,需要执行一下清理镜像,这就是最后的步骤
清理镜像,腾出空间
:return:
"""

try:
# 判断有没有要删除的镜像
if self.repo_dispose_count == 0:
print("\033[0;35mvolume:本次无需清理存储\033[0m")
else:
# 定义一个垃圾清理的json(post需要的参数)
da = {"schedule": {"cron": "Manual", "type": "Manual"}}
print("\033[0;35mvolume:开始回收存储空间!\033[0m")
# 访问接口
r_volume = self.session.post('{}/system/gc/schedule'.format(self.url), json=da)
r_volume.raise_for_status()
print("\033[0;35mvolue:回收存储空间已完成!\033[0m")
except:
traceback.print_exc()
raise


def main(api_url, login, exclude, beforetime, keepmirror):
"""
程序执行的入口
:param api_url: harbor api地址
:param login: 登录认证
:param exclude: 过滤的列表
:param beforetime: 删除多久之前的镜像,指定天数
:param keepmirror: 对仓库的镜像保留的个数
:return:
"""

# 为了计数设置开始时间
start = time.time()
try:
# 实例化 Harbor 类(传入参数)
har = Harbor(api_url=api_url, user=login, exclude=exclude, beforetime=beforetime, keepmirror=keepmirror)
# 获取项目
har.list_project()
# 获取仓库
har.list_repo()
# 获取仓库镜像标签
har.list_tag()
# 删除镜像
har.del_tag()
# 清理空间
har.volume_recycle()
print("所有操作运行完成!")
# 结束时间
end = time.time()
# 统计时间
allTime = end - start
print("运行结束共耗时:{:.2f}s".format(allTime))
except:
end = time.time()
allTime = end - start
print('清理出错!')
print("运行结束共耗时:{:.2f}s".format(allTime))



if __name__ == '__main__':
# harbor api url(自行更换)
api_url = "https://harbor.xxxxxx.com/api"
# harbor用户名密码(自行更换)
login = HTTPBasicAuth("harboruser", "harborpass")
# 过滤的项目名称(自行更换)
exclude = ["java", "php", "python", "kubernetes"]
# 没有过滤的项目可以保持空列表
# exclude = []
# 删除多久之前的镜像
beforetime = 90
# 要保留的镜像个数
keepmirror = 2
# 执行入口程序
main(api_url=api_url, login=login, exclude=exclude, beforetime=beforetime, keepmirror=keepmirror)


注释可能写的不太好,如果有问题可以留言或者联系我!!!

文章转载自小朱哥的技术人生,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论