[Python] 纯文本查看 复制代码import os
import subprocess
import time
import gitlab
from alibabacloud_devops20210625 import models as devops_20210625_models
from alibabacloud_devops20210625.client import Client as devops20210625Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
# 云效企业id
organization_id = ''
# 云效企业parentId
parent_id = 1234
# 云效access_key_id
access_key_id = '云效access_key_id'
# 云效access_key_secret
access_key_secret = "云效access_key_secret"
# gitlab地址
gitlab_url = 'http://*****/'
# gitlab token
gitlab_token = '云效的访问令牌'
# 初始化gitlab
gl = gitlab.Gitlab(url=gitlab_url, private_token=gitlab_token)
# 项目迁移临时下载文件夹
download_path = 'E:\\GitSpace'
# gitlab推送地址
gitlab_push_url = 'gitlab推送地址'
# 创建链接
def create_client():
"""
创建链接
:return: client
"""
config = open_api_models.Config(
access_key_id=access_key_id,
access_key_secret=access_key_secret,
)
config.endpoint = f'devops.cn-hangzhou.aliyuncs.com'
return devops20210625Client(config)
def getListRepositoryGroups():
"""
获取所有代码组
:return: 代码组列表
"""
client = create_client()
list_repository_groups_request = devops_20210625_models.ListRepositoryGroupsRequest(
parent_id=parent_id,
organization_id=organization_id,
page_size=100
)
runtime = util_models.RuntimeOptions()
headers = {}
try:
# 复制代码运行请自行打印 API 的返回值
return client.list_repository_groups_with_options(list_repository_groups_request, headers, runtime)
except Exception as error:
# 错误 message
print(error.message)
# 诊断地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
def getGroupInfo():
"""
获取所有代码组的详细信息
:return:
"""
response = getListRepositoryGroups()
# response转为json
response_json = response.to_map()
# 项目组列表
groupList = response_json['body']['result']
return groupList
# 获取代码组下面的所有仓库
def getRepositoriesByGroupId(groupId: str):
"""
获取代码组下面的所有仓库
:param groupId: 组id
:return: 仓库列表
"""
client = create_client()
list_group_repositories_request = devops_20210625_models.ListGroupRepositoriesRequest(
organization_id=organization_id
)
runtime = util_models.RuntimeOptions()
headers = {}
try:
# 复制代码运行请自行打印 API 的返回值
res = client.list_group_repositories_with_options(groupId, list_group_repositories_request, headers, runtime)
res_json = res.to_map()
return res_json['body']['result']
except Exception as error:
# 错误 message
print(error.message)
# 诊断地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
def getGitlabRepositories():
"""
获取gitlab所有的仓库
:return:
"""
projects = gl.projects.list(all=True)
repositories = []
for project in projects:
repositories.append(project.attributes)
return repositories
# 获取gitlab所有的组
def getGitlabGroups():
"""
获取gitlab所有的组
:return:
"""
groups = gl.groups.list(all=True)
group_list = []
for group in groups:
group_list.append(group.attributes)
return group_list
def deleteRepositoryInGitlab():
"""
删除gitlab中的仓库
:return:
"""
gitlab_repositories = getGitlabRepositories()
for repository in gitlab_repositories:
repository_name = repository['name']
print(f'删除gitlab仓库: {repository_name}')
gl.projects.delete(repository['id'])
def deleteGroupInGitlab():
"""
删除gitlab中的组
:return:
"""
gitlab_group = getGitlabGroups()
for group in gitlab_group:
group_name = group['name']
print(f'删除gitlab组: {group_name}')
gl.groups.delete(group['id'])
def createGroupInDevops():
"""
创建云效中的组到gitlab中
:return: None
"""
yunxiao_group = getGroupInfo()
gitlab_group = getGitlabGroups()
print(f'云效组数量: {len(yunxiao_group)}, gitlab组数量: {len(gitlab_group)}')
# 循环yunxiao_group
for group in yunxiao_group:
group_name = group['name']
# 如果group_name为空 则跳过
group_description = group['description']
# 是否存在
is_exist = False
for gitlab_info in gitlab_group:
gitlab_name = gitlab_info['name']
if group_name == gitlab_name:
is_exist = True
break
if not is_exist:
# 创建
print(f'创建gitlab组: {group_name}')
gl.groups.create({'name': group_name, 'path': group_name, 'description': group_description})
def clone_mirror(target_path, repo_url):
"""
在指定路径克隆一个裸仓库
Args:
target_path: 目标路径
repo_url: 远程仓库的URL
"""
print(f'克隆仓库: {repo_url}')
# 创建目标路径
os.makedirs(target_path, exist_ok=True)
# 切换到目标路径
os.chdir(target_path)
# 执行git clone --mirror命令 并显示输出
subprocess.call(['git', 'clone', '--mirror', repo_url])
# 返回项目路径
return os.path.join(target_path, os.path.basename(repo_url))
def push_mirror(repo_path, remote_url):
"""
推送一个裸仓库到远程仓库
Args:
repo_path: 本地裸仓库路径
remote_url: 远程仓库的URL
"""
# 切换到裸仓库路径
os.chdir(repo_path)
# 执行git push --mirror命令
subprocess.call(['git', 'push', '--mirror', remote_url])
print(f'推送成功: {remote_url}')
# 判断组中的项目是否存在
def isExistInGitlab(group_name, project_name):
"""
判断组中的项目是否存在
:param group_name: 组名
:param project_name: 项目名
:return: True/False
"""
projects = gl.projects.list(all=True)
for project in projects:
if project.attributes['name'] == project_name and project.attributes['namespace']['name'] == group_name:
return True
return False
if __name__ == '__main__':
# 开始创建群组
createGroupInDevops()
print('-----------------------------创建群组完成-----------------------------')
groupInfos = getGroupInfo()
for groupInfo in groupInfos:
print(f'开始迁移群组: {groupInfo["name"]},id: {groupInfo["id"]}')
repositories = getRepositoriesByGroupId(str(groupInfo['id']))
# 暂停1s
time.sleep(1)
for repository in repositories:
name = repository['name']
clone_url = repository['sshUrl']
group_name = groupInfo['name']
# 判断是否存在
if isExistInGitlab(group_name, name):
print(f'仓库已存在: {name}')
continue
print(f'开始迁移仓库: {repository["name"]},组名:{group_name}')
push_path = clone_mirror(download_path, clone_url)
print(f'克隆成功至路径: {push_path},开始推送至gitlab')
push_mirror(push_path, f'{gitlab_push_url}{group_name}/{name}.git')
# deleteGroupInGitlab()