mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-14 07:35:07 +00:00
8833e5c1b5
* xftp: add PostgreSQL backend design spec
* update doc
* adjust styling
* add implementation plan
* refactor: move usedStorage from FileStore to XFTPEnv
* refactor: add getUsedStorage, getFileCount, expiredFiles store functions
* refactor: change file store operations from STM to IO
* refactor: extract FileStoreClass typeclass, move STM impl to Store.STM
* refactor: make XFTPEnv and server polymorphic over FileStoreClass
* feat: add PostgreSQL store skeleton with schema migration
* feat: implement PostgresFileStore operations
* feat: add PostgreSQL INI config, store dispatch, startup validation
* feat: add database import/export CLI commands
* test: add PostgreSQL backend tests
* fix: map ForeignKeyViolation to AUTH in addRecipient
When a file is concurrently deleted while addRecipient runs, the FK
constraint on recipients.sender_id raises ForeignKeyViolation. Previously
this propagated as INTERNAL; now it returns AUTH (file not found).
* fix: only decrement usedStorage for uploaded files on expiration
expireServerFiles unconditionally subtracted file_size from usedStorage
for every expired file, including files that were never uploaded (no
file_path). Since reserve only increments usedStorage during upload,
expiring never-uploaded files caused usedStorage to drift negative.
* fix: handle setFilePath error in receiveServerFile
setFilePath result was discarded with void. If it failed (file deleted
concurrently, or double-upload where file_path IS NULL guard rejected
the second write), the server still reported FROk, incremented stats,
and left usedStorage permanently inflated. Now the error is checked:
on failure, reserved storage is released and AUTH is returned.
* fix: escape double quotes in COPY CSV status field
The status field (e.g. "blocked,reason=spam,notice={...}") is quoted in
CSV for COPY protocol, but embedded double quotes from BlockingInfo
notice (JSON) were not escaped. This could break CSV parsing during
import. Now double quotes are escaped as "" per CSV spec.
* fix: reject upload to blocked file in Postgres setFilePath
In Postgres mode, getFile returns a snapshot TVar for fileStatus. If a
file is blocked between getFile and setFilePath, the stale status check
passes but the upload should be rejected. Added status = 'active' to
the UPDATE WHERE clause so blocked files cannot receive uploads.
* fix: add CHECK constraint on file_size > 0
Prevents negative or zero file_size values at the database level.
Without this, corrupted data from import or direct DB access could
cause incorrect storage accounting (getUsedStorage sums file_size,
and expiredFiles casts to Word32 which wraps negative values).
* fix: check for existing data before database import
importFileStore now checks if the target database already contains
files and aborts with an error. Previously, importing into a non-empty
database would fail mid-COPY on duplicate primary keys, leaving the
database in a partially imported state.
* fix: clean up disk file when setFilePath fails in receiveServerFile
When setFilePath fails (file deleted or blocked concurrently, or
duplicate upload), the uploaded file was left orphaned on disk with
no DB record pointing to it. Now the file is removed on failure,
matching the cleanup in the receiveChunk error path.
* fix: check storeAction result in deleteOrBlockServerFile_
The store action result (deleteFile/blockFile) was discarded with void.
If the DB row was already deleted by a concurrent operation, the
function still decremented usedStorage, causing drift. Now the error
propagates via ExceptT, skipping the usedStorage adjustment.
* fix: check deleteFile result in expireServerFiles
deleteFile result was discarded with void. If a concurrent delete
already removed the file, deleteFile returned AUTH but usedStorage
was still decremented — causing double-decrement drift. Now the
usedStorage adjustment and filesExpired stat only run on success.
* refactor: merge STM store into Store.hs, parameterize server tests
- Move STMFileStore and its FileStoreClass instance from Store/STM.hs
back into Store.hs — the separate file was unnecessary indirection
for the always-present default implementation.
- Parameterize xftpFileTests over store backend using HSpec SpecWith
pattern (following SMP's serverTests approach). The same 11 tests
now run against both memory and PostgreSQL backends via a bracket
parameter, eliminating all *Pg test duplicates.
- Extract shared run* functions (runTestFileChunkDeliveryAddRecipients,
runTestWrongChunkSize, runTestFileChunkExpiration, runTestFileStorageQuota)
from inlined test bodies.
* refactor: clean up per good-code review
- Remove internal helpers from Postgres.hs export list (withDB, withDB',
handleDuplicate, assertUpdated, withLog are not imported externally)
- Replace local isNothing_ with Data.Maybe.isNothing in Env.hs
- Consolidate duplicate/unused imports in XFTPStoreTests.hs
- Add file_path IS NULL and status guards to STM setFilePath, matching
the Postgres implementation semantics
* test: parameterize XFTP server, agent and CLI tests over store backend
- xftpTest/xftpTest2/xftpTest4/xftpTestN now take XFTPTestBracket as
first argument, enabling the same test to run against both memory
and PostgreSQL backends.
- xftpFileTests (server tests), xftpAgentFileTests (agent tests), and
xftpCLIFileTests (CLI tests) are SpecWith-parameterized suites that
receive the bracket from HSpec's before combinator.
- Test.hs runs each parameterized suite twice: once with
xftpMemoryBracket, once with xftpPostgresBracket (CPP-guarded).
- STM-specific tests (store log restore/replay) stay in memory-only
xftpAgentTests. SNI/CORS tests stay in memory-only xftpServerTests.
* refactor: remove dead test wrappers after parameterization
Remove old non-parameterized test wrapper functions that were
superseded by the store-backend-parameterized test suites.
All test bodies (run* and _ functions) are preserved and called
from the parameterized specs. Clean up unused imports.
* feat: add manual tests and guide
* refactor: merge file_size CHECK into initial migration
* refactor: extract rowToFileRec shared by getFile sender/recipient paths
* refactor: parameterize XFTPServerConfig over store type
Embed XFTPStoreConfig s as serverStoreCfg field, matching SMP's
ServerConfig. runXFTPServer and newXFTPServerEnv now take a single
XFTPServerConfig s. Restore verifyCmd local helper structure.
* refactor: minimize diff in tests
Restore xftpServerTests and xftpAgentTests bodies to match master
byte-for-byte (only type signatures change for XFTPTestBracket
parameterization); inline the runTestXXX helpers that were split
on this branch.
* refactor: restore getFile position to match master
* refactor: rename withSTMFile back to withFile
* refactor: close store log inside closeFileStore for STM backend
Move STM store log close responsibility into closeFileStore to
match PostgresFileStore, removing the asymmetry where only PG's
close was self-contained.
STMFileStore holds the log in a TVar populated by newXFTPServerEnv
after readWriteFileStore; stopServer no longer needs the explicit
withFileLog closeStoreLog call. Writes still go through XFTPEnv.storeLog
via withFileLog (unchanged).
* refactor: rename XFTPTestBracket to XFTPTestServer
* fix: move file_size check from PG schema to store log import
* refactor: use SQL-standard type names in XFTP schema
* perf: batch expired file deletions with deleteFiles
* refactor: stream export instead of loading recipients into memory
* refactor: parameterize XFTP store with FSType singleton dispatch
* refactor: minimize diff per review feedback
* refactor: use types over strings, deduplicate parser
* refactor: always parse database store type, fail at startup
* fix compilation without postgresql
* refactor: always parse database store type, fail at startup
1444 lines
47 KiB
Python
1444 lines
47 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Automated XFTP server test suite.
|
|
Tests memory and PostgreSQL backends, migration, persistence, and edge cases.
|
|
|
|
Prerequisites:
|
|
cabal build -fserver_postgres exe:xftp-server exe:xftp
|
|
PostgreSQL running (set PGHOST if non-default socket)
|
|
User 'xftp' with SUPERUSER must exist:
|
|
psql -U postgres -c "CREATE USER xftp WITH SUPERUSER;"
|
|
|
|
Usage:
|
|
python3 tests/manual/xftp-test.py
|
|
PGHOST=/tmp/pgsocket python3 tests/manual/xftp-test.py
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import shutil
|
|
import signal
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from pathlib import Path
|
|
|
|
|
|
# --- Configuration ---
|
|
|
|
PORT = "7921"
|
|
DB_NAME = "xftp_server_store"
|
|
DB_USER = "xftp"
|
|
DB_SCHEMA = "xftp_server"
|
|
PG_ADMIN_USER = "postgres"
|
|
|
|
|
|
# --- State ---
|
|
|
|
PASS = 0
|
|
FAIL = 0
|
|
server_proc = None
|
|
|
|
|
|
# --- Helpers ---
|
|
|
|
def run(cmd, *, check=True, input=None, timeout=30):
|
|
"""Run a command, return CompletedProcess."""
|
|
r = subprocess.run(
|
|
cmd, shell=isinstance(cmd, str),
|
|
capture_output=True, text=True,
|
|
input=input, timeout=timeout,
|
|
)
|
|
if check and r.returncode != 0:
|
|
raise subprocess.CalledProcessError(r.returncode, cmd, r.stdout, r.stderr)
|
|
return r
|
|
|
|
|
|
def cabal_bin(name):
|
|
r = run(f"cabal list-bin exe:{name}")
|
|
p = r.stdout.strip()
|
|
if not os.path.isfile(p):
|
|
sys.exit(f"Binary not found: {p}\nRun: cabal build -fserver_postgres exe:{name}")
|
|
return p
|
|
|
|
|
|
def psql(sql, *, user=PG_ADMIN_USER, db="postgres", check=True):
|
|
return run(["psql", "-U", user, "-d", db, "-t", "-A", "-c", sql], check=check)
|
|
|
|
|
|
def db_count(table):
|
|
r = psql(f"SET search_path TO {DB_SCHEMA}; SELECT count(*) FROM {table};",
|
|
user=DB_USER, db=DB_NAME, check=False)
|
|
if r.returncode != 0:
|
|
return -1
|
|
# psql -t -A output includes "SET" line from SET search_path, take the last line
|
|
lines = [l.strip() for l in r.stdout.strip().split("\n") if l.strip() and l.strip() != "SET"]
|
|
return int(lines[-1]) if lines else -1
|
|
|
|
|
|
def pass_(desc):
|
|
global PASS
|
|
PASS += 1
|
|
print(f" [PASS] {desc}")
|
|
|
|
|
|
def fail_(desc):
|
|
global FAIL
|
|
FAIL += 1
|
|
print(f" [FAIL] {desc}")
|
|
|
|
|
|
def check(desc, condition):
|
|
if condition:
|
|
pass_(desc)
|
|
else:
|
|
fail_(desc)
|
|
|
|
|
|
|
|
# --- INI helpers ---
|
|
|
|
def ini_set(key, value):
|
|
ini = ini_path()
|
|
txt = ini.read_text()
|
|
new_txt, n = re.subn(rf"^{re.escape(key)}:.*$", f"{key}: {value}", txt, flags=re.MULTILINE)
|
|
assert n > 0, f"ini_set: key '{key}' not found in {ini}"
|
|
ini.write_text(new_txt)
|
|
|
|
|
|
def ini_uncomment(key):
|
|
ini = ini_path()
|
|
txt = ini.read_text()
|
|
new_txt, n = re.subn(rf"^# {re.escape(key)}:", f"{key}:", txt, flags=re.MULTILINE)
|
|
assert n > 0, f"ini_uncomment: commented key '# {key}' not found in {ini}"
|
|
ini.write_text(new_txt)
|
|
|
|
|
|
def ini_comment(key):
|
|
ini = ini_path()
|
|
txt = ini.read_text()
|
|
new_txt, n = re.subn(rf"^{re.escape(key)}:", f"# {key}:", txt, flags=re.MULTILINE)
|
|
assert n > 0, f"ini_comment: key '{key}' not found in {ini}"
|
|
ini.write_text(new_txt)
|
|
|
|
|
|
def ini_path():
|
|
return Path(os.environ["XFTP_SERVER_CFG_PATH"]) / "file-server.ini"
|
|
|
|
|
|
# --- Server management ---
|
|
|
|
def init_server(quota="10gb"):
|
|
run([XFTP_SERVER, "init", "-p", str(test_dir / "files"), "-q", quota, "--ip", "127.0.0.1"])
|
|
ini_set("port", PORT)
|
|
fp = (Path(os.environ["XFTP_SERVER_CFG_PATH"]) / "fingerprint").read_text().strip()
|
|
return f"xftp://{fp}@127.0.0.1:{PORT}"
|
|
|
|
|
|
_server_log_fh = None
|
|
|
|
def start_server(*extra_args):
|
|
global server_proc, _server_log_fh
|
|
stop_server()
|
|
log_path = test_dir / "server.log"
|
|
_server_log_fh = open(log_path, "w")
|
|
server_proc = subprocess.Popen(
|
|
[XFTP_SERVER, "start"] + list(extra_args),
|
|
stdout=_server_log_fh,
|
|
stderr=subprocess.STDOUT,
|
|
)
|
|
time.sleep(2)
|
|
if server_proc.poll() is not None:
|
|
_server_log_fh.close()
|
|
_server_log_fh = None
|
|
log = log_path.read_text()
|
|
print(f" [ERROR] Server exited with code {server_proc.returncode}")
|
|
for line in log.strip().split("\n")[-5:]:
|
|
print(f" {line}")
|
|
return False
|
|
return True
|
|
|
|
|
|
def stop_server():
|
|
global server_proc, _server_log_fh
|
|
if server_proc and server_proc.poll() is None:
|
|
server_proc.send_signal(signal.SIGTERM)
|
|
try:
|
|
server_proc.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
server_proc.kill()
|
|
server_proc.wait()
|
|
server_proc = None
|
|
if _server_log_fh:
|
|
_server_log_fh.close()
|
|
_server_log_fh = None
|
|
time.sleep(0.5)
|
|
|
|
|
|
def clean_test_dir():
|
|
stop_server()
|
|
if test_dir.exists():
|
|
shutil.rmtree(test_dir)
|
|
(test_dir / "files").mkdir(parents=True)
|
|
(test_dir / "descriptions").mkdir()
|
|
(test_dir / "received").mkdir()
|
|
|
|
|
|
def clean_db():
|
|
psql(f"DROP DATABASE IF EXISTS {DB_NAME};")
|
|
psql(f"CREATE DATABASE {DB_NAME} OWNER {DB_USER};")
|
|
|
|
|
|
def enable_db_mode():
|
|
ini_set("store_files", "database")
|
|
ini_uncomment("db_connection")
|
|
ini_uncomment("db_schema")
|
|
ini_uncomment("db_pool_size")
|
|
|
|
|
|
def disable_db_mode():
|
|
ini_set("store_files", "memory")
|
|
ini_comment("db_connection")
|
|
ini_comment("db_schema")
|
|
ini_comment("db_pool_size")
|
|
|
|
|
|
# --- File operations ---
|
|
|
|
def make_file(name, size_mb=1):
|
|
path = test_dir / name
|
|
with open(path, "wb") as f:
|
|
f.write(os.urandom(size_mb * 1024 * 1024))
|
|
return path
|
|
|
|
|
|
def desc_dir(name):
|
|
return test_dir / "descriptions" / f"{name}.xftp"
|
|
|
|
|
|
def send_file(src, n=1):
|
|
return run([XFTP, "send", str(src), str(test_dir / "descriptions"),
|
|
"-s", srv, "-n", str(n)], check=False, timeout=60)
|
|
|
|
|
|
def recv_file(desc_path):
|
|
return run([XFTP, "recv", str(desc_path), str(test_dir / "received"), "-y"],
|
|
check=False, timeout=60)
|
|
|
|
|
|
def del_file(desc_path):
|
|
return run([XFTP, "del", str(desc_path), "-y"], check=False, timeout=30)
|
|
|
|
|
|
def files_match(a, b):
|
|
"""Compare two files byte-for-byte. Both must exist."""
|
|
a, b = Path(a), Path(b)
|
|
if not a.exists() or not b.exists():
|
|
return False
|
|
return a.read_bytes() == b.read_bytes()
|
|
|
|
|
|
def db_import():
|
|
return run([XFTP_SERVER, "database", "import"], input="Y\n", check=False, timeout=30)
|
|
|
|
|
|
def db_export():
|
|
return run([XFTP_SERVER, "database", "export"], input="Y\n", check=False, timeout=30)
|
|
|
|
|
|
def create_schema():
|
|
"""Create the xftp_server schema so the server can start on a fresh DB."""
|
|
psql(f"CREATE SCHEMA IF NOT EXISTS {DB_SCHEMA};", user=DB_USER, db=DB_NAME)
|
|
|
|
|
|
CONTROL_PORT = "15230"
|
|
CONTROL_PASSWORD = "testadmin"
|
|
|
|
|
|
def enable_control_port():
|
|
ini = ini_path()
|
|
txt = ini.read_text()
|
|
txt, n1 = re.subn(r"^# control_port:.*$", f"control_port: {CONTROL_PORT}", txt, flags=re.MULTILINE)
|
|
txt, n2 = re.subn(r"^# control_port_admin_password:.*$",
|
|
f"control_port_admin_password: {CONTROL_PASSWORD}", txt, flags=re.MULTILINE)
|
|
assert n1 > 0, "enable_control_port: '# control_port' not found in INI"
|
|
assert n2 > 0, "enable_control_port: '# control_port_admin_password' not found in INI"
|
|
ini.write_text(txt)
|
|
|
|
|
|
def control_recv(s):
|
|
"""Receive all available data from control port (drains buffer)."""
|
|
time.sleep(0.3)
|
|
chunks = []
|
|
s.settimeout(0.5)
|
|
while True:
|
|
try:
|
|
data = s.recv(4096)
|
|
if not data:
|
|
break
|
|
chunks.append(data)
|
|
except socket.timeout:
|
|
break
|
|
return b"".join(chunks).decode().strip()
|
|
|
|
|
|
def control_send_recv(s, cmd):
|
|
"""Send a command and receive the response line."""
|
|
s.sendall(f"{cmd}\n".encode())
|
|
return control_recv(s)
|
|
|
|
|
|
def control_connect():
|
|
"""Connect to control port, drain welcome banner, return socket."""
|
|
s = socket.create_connection(("127.0.0.1", int(CONTROL_PORT)), timeout=5)
|
|
try:
|
|
control_recv(s) # drain welcome banner (2 lines)
|
|
except Exception:
|
|
s.close()
|
|
raise
|
|
return s
|
|
|
|
|
|
def control_cmd(cmd, *, auth=True):
|
|
"""Send a command to the server control port, return the response.
|
|
If auth=True, authenticates as admin first and verifies the role."""
|
|
s = control_connect()
|
|
try:
|
|
if auth:
|
|
auth_resp = control_send_recv(s, f"auth {CONTROL_PASSWORD}")
|
|
assert auth_resp == "Current role is CPRAdmin", \
|
|
f"control_cmd: auth failed, got: {auth_resp!r}"
|
|
return control_send_recv(s, cmd)
|
|
finally:
|
|
try:
|
|
s.sendall(b"quit\n")
|
|
except OSError:
|
|
pass
|
|
s.close()
|
|
|
|
|
|
def get_recipient_ids(desc_path):
|
|
"""Extract recipient IDs from a file description (.xftp file)."""
|
|
text = Path(desc_path).read_text()
|
|
ids = []
|
|
for line in text.split("\n"):
|
|
line = line.strip()
|
|
if line.startswith("- ") and ":" in line:
|
|
# Format: - N:recipientId:privateKey:digest[:size]
|
|
parts = line[2:].split(":")
|
|
if len(parts) >= 3:
|
|
ids.append(parts[1])
|
|
return ids
|
|
|
|
|
|
|
|
# ===================================================================
|
|
# Tests
|
|
# ===================================================================
|
|
|
|
def test_1_basic_memory():
|
|
global srv
|
|
print("\n=== 1. Basic send/receive (memory) ===")
|
|
clean_test_dir()
|
|
srv = init_server()
|
|
assert start_server()
|
|
|
|
src = make_file("testfile.bin", 5)
|
|
send_file(src, n=2)
|
|
|
|
dd = desc_dir("testfile.bin")
|
|
check("1.1 rcv1.xftp created", (dd / "rcv1.xftp").exists())
|
|
check("1.2 rcv2.xftp created", (dd / "rcv2.xftp").exists())
|
|
check("1.3 snd.xftp.private created", (dd / "snd.xftp.private").exists())
|
|
|
|
recv_file(dd / "rcv1.xftp")
|
|
check("1.4 rcv1 file matches", files_match(src, test_dir / "received/testfile.bin"))
|
|
check("1.5 rcv1.xftp deleted by -y", not (dd / "rcv1.xftp").exists())
|
|
|
|
(test_dir / "received/testfile.bin").unlink(missing_ok=True)
|
|
recv_file(dd / "rcv2.xftp")
|
|
check("1.6 rcv2 file matches", files_match(src, test_dir / "received/testfile.bin"))
|
|
check("1.7 rcv2.xftp deleted by -y", not (dd / "rcv2.xftp").exists())
|
|
|
|
del_file(dd / "snd.xftp.private")
|
|
check("1.8 snd.xftp.private deleted by -y", not (dd / "snd.xftp.private").exists())
|
|
fc = len(list((test_dir / "files").iterdir()))
|
|
check(f"1.9 server files cleaned ({fc})", fc == 0)
|
|
|
|
stop_server()
|
|
|
|
|
|
def test_2_basic_postgres():
|
|
global srv
|
|
print("\n=== 2. Basic send/receive (PostgreSQL) ===")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
enable_db_mode()
|
|
# Remove store log so database mode starts cleanly
|
|
store_log = Path(os.environ["XFTP_SERVER_LOG_PATH"]) / "file-server-store.log"
|
|
store_log.unlink(missing_ok=True)
|
|
# Create schema so server can connect
|
|
create_schema()
|
|
ok = start_server("--confirm-migrations", "up")
|
|
check("2.1 server started", ok)
|
|
if not ok:
|
|
return
|
|
|
|
src = make_file("testfile.bin", 5)
|
|
send_file(src, n=2)
|
|
dd = desc_dir("testfile.bin")
|
|
check("2.2 send succeeded", (dd / "rcv1.xftp").exists())
|
|
|
|
recv_file(dd / "rcv1.xftp")
|
|
check("2.3 recv matches", files_match(src, test_dir / "received/testfile.bin"))
|
|
|
|
fc = db_count("files")
|
|
rc = db_count("recipients")
|
|
check(f"2.4 files in database ({fc})", fc > 0 and fc != -1)
|
|
check(f"2.5 recipients in database ({rc})", rc > 0 and rc != -1)
|
|
|
|
del_file(dd / "snd.xftp.private")
|
|
fc_after = db_count("files")
|
|
rc_after = db_count("recipients")
|
|
check(f"2.6 all files deleted ({fc_after})", fc_after == 0)
|
|
check(f"2.7 all recipients deleted ({rc_after})", rc_after == 0)
|
|
|
|
stop_server()
|
|
|
|
|
|
def test_3_migration_memory_to_pg():
|
|
global srv
|
|
print("\n=== 3. Migration: memory -> PostgreSQL ===")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
assert start_server()
|
|
|
|
srcA = make_file("fileA.bin")
|
|
send_file(srcA, n=2)
|
|
check("3.1 fileA sent", (desc_dir("fileA.bin") / "rcv1.xftp").exists())
|
|
|
|
srcB = make_file("fileB.bin")
|
|
send_file(srcB, n=2)
|
|
check("3.2 fileB sent", (desc_dir("fileB.bin") / "rcv1.xftp").exists())
|
|
|
|
# Partially receive fileB
|
|
recv_file(desc_dir("fileB.bin") / "rcv1.xftp")
|
|
check("3.3 fileB rcv1 received", files_match(srcB, test_dir / "received/fileB.bin"))
|
|
|
|
stop_server()
|
|
|
|
# Switch to database and import
|
|
enable_db_mode()
|
|
r = db_import()
|
|
check("3.4 import succeeded", r.returncode == 0)
|
|
|
|
log_bak = Path(os.environ["XFTP_SERVER_LOG_PATH"]) / "file-server-store.log.bak"
|
|
log_file = Path(os.environ["XFTP_SERVER_LOG_PATH"]) / "file-server-store.log"
|
|
check("3.5 store log renamed to .bak", log_bak.exists())
|
|
check("3.6 store log removed", not log_file.exists())
|
|
|
|
fc = db_count("files")
|
|
rc = db_count("recipients")
|
|
check(f"3.7 files imported ({fc})", fc > 0 and fc != -1)
|
|
check(f"3.8 recipients imported ({rc})", rc > 0 and rc != -1)
|
|
|
|
# Start PG server, receive remaining
|
|
ok = start_server("--confirm-migrations", "up")
|
|
check("3.9 PG server started", ok)
|
|
if not ok:
|
|
return
|
|
|
|
recv_file(desc_dir("fileA.bin") / "rcv1.xftp")
|
|
check("3.10 fileA rcv1 after migration", files_match(srcA, test_dir / "received/fileA.bin"))
|
|
|
|
recv_file(desc_dir("fileA.bin") / "rcv2.xftp")
|
|
# rcv2 downloads to fileA_1.bin (fileA.bin already exists from rcv1)
|
|
rcv2_path = test_dir / "received"
|
|
rcv2_files = [f for f in rcv2_path.iterdir() if f.name.startswith("fileA") and f.name != "fileA.bin"]
|
|
check("3.11 fileA rcv2 after migration", len(rcv2_files) == 1 and files_match(srcA, rcv2_files[0]))
|
|
|
|
(test_dir / "received/fileB.bin").unlink(missing_ok=True)
|
|
recv_file(desc_dir("fileB.bin") / "rcv2.xftp")
|
|
check("3.12 fileB rcv2 after migration", files_match(srcB, test_dir / "received/fileB.bin"))
|
|
|
|
stop_server()
|
|
|
|
|
|
def test_4_migration_pg_to_memory():
|
|
global srv
|
|
print("\n=== 4. Migration: PostgreSQL -> memory ===")
|
|
|
|
r = db_export()
|
|
check("4.1 export succeeded", r.returncode == 0)
|
|
|
|
log_file = Path(os.environ["XFTP_SERVER_LOG_PATH"]) / "file-server-store.log"
|
|
check("4.2 store log created", log_file.exists())
|
|
|
|
disable_db_mode()
|
|
ok = start_server()
|
|
check("4.3 memory server started", ok)
|
|
if not ok:
|
|
return
|
|
|
|
r = del_file(desc_dir("fileA.bin") / "snd.xftp.private")
|
|
check("4.4 fileA delete on memory round-trip", r.returncode == 0)
|
|
|
|
r = del_file(desc_dir("fileB.bin") / "snd.xftp.private")
|
|
check("4.5 fileB delete on memory round-trip", r.returncode == 0)
|
|
|
|
stop_server()
|
|
|
|
|
|
def test_4b_send_pg_receive_memory():
|
|
"""Send on PostgreSQL, export, receive on memory."""
|
|
global srv
|
|
print("\n=== 4b. Send on PG, export, receive on memory ===")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
enable_db_mode()
|
|
store_log = Path(os.environ["XFTP_SERVER_LOG_PATH"]) / "file-server-store.log"
|
|
store_log.unlink(missing_ok=True)
|
|
create_schema()
|
|
ok = start_server("--confirm-migrations", "up")
|
|
check("4b.1 PG server started", ok)
|
|
if not ok:
|
|
return
|
|
|
|
srcA = make_file("pgfileA.bin")
|
|
send_file(srcA, n=2)
|
|
check("4b.2 pgfileA sent", (desc_dir("pgfileA.bin") / "rcv1.xftp").exists())
|
|
|
|
# Partially receive rcv1 on PG
|
|
recv_file(desc_dir("pgfileA.bin") / "rcv1.xftp")
|
|
check("4b.3 pgfileA rcv1 on PG", files_match(srcA, test_dir / "received/pgfileA.bin"))
|
|
|
|
stop_server()
|
|
|
|
# Export to store log
|
|
r = db_export()
|
|
check("4b.4 export succeeded", r.returncode == 0)
|
|
|
|
# Switch to memory
|
|
disable_db_mode()
|
|
ok = start_server()
|
|
check("4b.5 memory server started", ok)
|
|
if not ok:
|
|
return
|
|
|
|
# rcv2 should work on memory backend
|
|
(test_dir / "received/pgfileA.bin").unlink(missing_ok=True)
|
|
recv_file(desc_dir("pgfileA.bin") / "rcv2.xftp")
|
|
check("4b.6 pgfileA rcv2 on memory after export", files_match(srcA, test_dir / "received/pgfileA.bin"))
|
|
|
|
del_file(desc_dir("pgfileA.bin") / "snd.xftp.private")
|
|
check("4b.7 delete on memory", not (desc_dir("pgfileA.bin") / "snd.xftp.private").exists())
|
|
|
|
stop_server()
|
|
|
|
|
|
def test_5_restart_persistence():
|
|
global srv
|
|
print("\n=== 5. Restart persistence ===")
|
|
|
|
# 5.1 Memory with store log
|
|
print(" --- 5.1 memory + store log ---")
|
|
clean_test_dir()
|
|
srv = init_server()
|
|
assert start_server()
|
|
|
|
src = make_file("persist.bin")
|
|
send_file(src)
|
|
stop_server()
|
|
assert start_server()
|
|
|
|
recv_file(desc_dir("persist.bin") / "rcv1.xftp")
|
|
check("5.1 recv after restart (memory+log)", files_match(src, test_dir / "received/persist.bin"))
|
|
stop_server()
|
|
|
|
# 5.2 Memory without store log
|
|
print(" --- 5.2 memory, no log ---")
|
|
for f in (test_dir / "descriptions").iterdir():
|
|
shutil.rmtree(f) if f.is_dir() else f.unlink()
|
|
for f in (test_dir / "received").iterdir():
|
|
f.unlink()
|
|
ini_set("enable", "off")
|
|
assert start_server()
|
|
|
|
src2 = make_file("persist2.bin")
|
|
send_file(src2)
|
|
stop_server()
|
|
assert start_server()
|
|
|
|
r = recv_file(desc_dir("persist2.bin") / "rcv1.xftp")
|
|
check("5.2a recv after restart (no log) fails", r.returncode != 0)
|
|
check("5.2b error is AUTH", "AUTH" in (r.stdout + r.stderr))
|
|
stop_server()
|
|
|
|
# 5.3 PostgreSQL
|
|
print(" --- 5.3 PostgreSQL ---")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
enable_db_mode()
|
|
store_log = Path(os.environ["XFTP_SERVER_LOG_PATH"]) / "file-server-store.log"
|
|
store_log.unlink(missing_ok=True)
|
|
create_schema()
|
|
ok = start_server("--confirm-migrations", "up")
|
|
check("5.3a PG server started", ok)
|
|
if not ok:
|
|
return
|
|
|
|
src = make_file("persist.bin")
|
|
send_file(src)
|
|
stop_server()
|
|
ok = start_server("--confirm-migrations", "up")
|
|
check("5.3b PG server restarted", ok)
|
|
if not ok:
|
|
return
|
|
|
|
recv_file(desc_dir("persist.bin") / "rcv1.xftp")
|
|
check("5.3 recv after restart (PostgreSQL)", files_match(src, test_dir / "received/persist.bin"))
|
|
stop_server()
|
|
|
|
|
|
def test_6_edge_cases():
|
|
global srv
|
|
print("\n=== 6. Edge cases ===")
|
|
|
|
# 6.1 Receive after server-side delete
|
|
print(" --- 6.1 receive after delete ---")
|
|
clean_test_dir()
|
|
srv = init_server()
|
|
assert start_server()
|
|
|
|
src = make_file("deltest.bin")
|
|
send_file(src, n=2)
|
|
del_file(desc_dir("deltest.bin") / "snd.xftp.private")
|
|
r = recv_file(desc_dir("deltest.bin") / "rcv2.xftp")
|
|
check("6.1a recv after server delete fails", r.returncode != 0)
|
|
check("6.1b error is AUTH", "AUTH" in (r.stdout + r.stderr))
|
|
stop_server()
|
|
|
|
# 6.2 Multiple recipients, partial ack
|
|
print(" --- 6.2 multiple recipients ---")
|
|
clean_test_dir()
|
|
srv = init_server()
|
|
assert start_server()
|
|
|
|
src = make_file("multi.bin")
|
|
send_file(src, n=3)
|
|
|
|
recv_file(desc_dir("multi.bin") / "rcv1.xftp")
|
|
check("6.2a rcv1 received", files_match(src, test_dir / "received/multi.bin"))
|
|
|
|
(test_dir / "received/multi.bin").unlink(missing_ok=True)
|
|
recv_file(desc_dir("multi.bin") / "rcv2.xftp")
|
|
check("6.2b rcv2 still works", files_match(src, test_dir / "received/multi.bin"))
|
|
|
|
(test_dir / "received/multi.bin").unlink(missing_ok=True)
|
|
recv_file(desc_dir("multi.bin") / "rcv3.xftp")
|
|
check("6.2c rcv3 still works", files_match(src, test_dir / "received/multi.bin"))
|
|
stop_server()
|
|
|
|
# 6.3 Database mode with existing store log should fail
|
|
# Simulates: ran server in memory mode (creating store log), then switched to database
|
|
print(" --- 6.3 database mode + existing store log ---")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
# Start in memory mode to create the store log file
|
|
assert start_server()
|
|
make_file("dummy63.bin")
|
|
send_file(test_dir / "dummy63.bin")
|
|
stop_server()
|
|
# Verify store log was created
|
|
store_log = Path(os.environ["XFTP_SERVER_LOG_PATH"]) / "file-server-store.log"
|
|
assert store_log.exists(), "store log should exist after memory-mode run"
|
|
# Switch to database mode without importing
|
|
enable_db_mode()
|
|
create_schema()
|
|
log_file = test_dir / "server-63.log"
|
|
with open(log_file, "w") as fh:
|
|
p = subprocess.Popen(
|
|
[XFTP_SERVER, "start", "--confirm-migrations", "up"],
|
|
stdout=fh, stderr=subprocess.STDOUT,
|
|
)
|
|
time.sleep(5)
|
|
exited = p.poll() is not None
|
|
if not exited:
|
|
p.kill()
|
|
p.wait()
|
|
log_text = log_file.read_text()
|
|
check("6.3a server exited", exited)
|
|
check("6.3b error message correct",
|
|
"store log file" in log_text and "exists but store_files is" in log_text)
|
|
|
|
# 6.4 Database mode, no store log, schema doesn't exist (should fail)
|
|
print(" --- 6.4 database mode + no schema ---")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
enable_db_mode()
|
|
# No schema, no store log — server should fail with "schema does not exist"
|
|
store_log = Path(os.environ["XFTP_SERVER_LOG_PATH"]) / "file-server-store.log"
|
|
store_log.unlink(missing_ok=True)
|
|
ok = start_server("--confirm-migrations", "up")
|
|
check("6.4a start fails without schema", not ok)
|
|
log_text = (test_dir / "server.log").read_text() if (test_dir / "server.log").exists() else ""
|
|
check("6.4b error mentions schema", "schema" in log_text and "does not exist" in log_text)
|
|
stop_server()
|
|
|
|
# 6.5 Dual-write mode: database + db_store_log: on
|
|
# Verifies that new writes in dual-write mode land in BOTH the DB and the store log,
|
|
# so switching to memory-only (using the store log) preserves files sent in dual-write.
|
|
print(" --- 6.5 database + store log + db_store_log: on ---")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
enable_db_mode()
|
|
ini_uncomment("db_store_log")
|
|
ini_set("db_store_log", "on")
|
|
create_schema()
|
|
# Remove store log so import isn't needed for initial start
|
|
store_log = Path(os.environ["XFTP_SERVER_LOG_PATH"]) / "file-server-store.log"
|
|
store_log.unlink(missing_ok=True)
|
|
ok = start_server("--confirm-migrations", "up")
|
|
check("6.5a start in dual-write mode", ok)
|
|
if not ok:
|
|
stop_server()
|
|
else:
|
|
# Send a NEW file in dual-write mode
|
|
src = make_file("dual.bin")
|
|
send_file(src, n=1)
|
|
dd = desc_dir("dual.bin")
|
|
check("6.5b send in dual-write mode", (dd / "rcv1.xftp").exists())
|
|
|
|
stop_server()
|
|
|
|
# Verify store log was written (dual-write)
|
|
check("6.5c store log has entries",
|
|
store_log.exists() and store_log.stat().st_size > 0)
|
|
|
|
# Verify DB has the file too
|
|
fc = db_count("files")
|
|
check(f"6.5d file in DB ({fc})", fc > 0 and fc != -1)
|
|
|
|
# Now switch to memory-only using the store log — proves the log has valid data
|
|
disable_db_mode()
|
|
ini_comment("db_store_log")
|
|
ok = start_server()
|
|
check("6.5e memory server from dual-write log", ok)
|
|
if ok:
|
|
recv_file(dd / "rcv1.xftp")
|
|
check("6.5f recv on memory from dual-write log",
|
|
files_match(src, test_dir / "received/dual.bin"))
|
|
stop_server()
|
|
|
|
# 6.6 Import to non-empty database should fail
|
|
# Use db_export to produce a real store log, then try to re-import without clearing DB.
|
|
print(" --- 6.6 import to non-empty DB ---")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
enable_db_mode()
|
|
store_log = Path(os.environ["XFTP_SERVER_LOG_PATH"]) / "file-server-store.log"
|
|
store_log.unlink(missing_ok=True)
|
|
create_schema()
|
|
ok = start_server("--confirm-migrations", "up")
|
|
if ok:
|
|
make_file("dummy.bin")
|
|
send_file(test_dir / "dummy.bin")
|
|
stop_server()
|
|
# Export produces a real, valid store log
|
|
r = db_export()
|
|
check("6.6a export for re-import test", r.returncode == 0)
|
|
# Now try to import the valid log back into the non-empty DB
|
|
r = db_import()
|
|
check("6.6b import to non-empty DB fails", r.returncode != 0)
|
|
else:
|
|
fail_("6.6 could not start server for setup")
|
|
|
|
# 6.7 Import with no store log file (should fail)
|
|
print(" --- 6.7 import without store log ---")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
enable_db_mode()
|
|
store_log = Path(os.environ["XFTP_SERVER_LOG_PATH"]) / "file-server-store.log"
|
|
store_log.unlink(missing_ok=True)
|
|
r = db_import()
|
|
check("6.7 import without store log fails", r.returncode != 0)
|
|
|
|
# 6.8 Export when store log already exists (should fail)
|
|
print(" --- 6.8 export with existing store log ---")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
enable_db_mode()
|
|
create_schema()
|
|
store_log = Path(os.environ["XFTP_SERVER_LOG_PATH"]) / "file-server-store.log"
|
|
store_log.unlink(missing_ok=True)
|
|
ok = start_server("--confirm-migrations", "up")
|
|
if ok:
|
|
make_file("exp.bin")
|
|
send_file(test_dir / "exp.bin")
|
|
stop_server()
|
|
# Create a log file to block export
|
|
store_log.write_text("existing\n")
|
|
r = db_export()
|
|
check("6.8 export with existing store log fails", r.returncode != 0)
|
|
else:
|
|
fail_("6.8 could not start server for setup")
|
|
|
|
|
|
def test_7_blocking():
|
|
"""Block files via control port, verify blocked state survives migration."""
|
|
global srv
|
|
print("\n=== 7. File blocking via control port ===")
|
|
|
|
# 7.1 Block a file and verify receive fails with BLOCKED
|
|
print(" --- 7.1 block file, receive fails ---")
|
|
clean_test_dir()
|
|
srv = init_server()
|
|
enable_control_port()
|
|
assert start_server()
|
|
|
|
src = make_file("blockme.bin")
|
|
send_file(src, n=2)
|
|
dd = desc_dir("blockme.bin")
|
|
|
|
# Get recipient IDs from the file description
|
|
rcv_ids = get_recipient_ids(dd / "rcv1.xftp")
|
|
check("7.1a got recipient IDs", len(rcv_ids) > 0)
|
|
|
|
# Block using the first chunk's recipient ID
|
|
resp = control_cmd(f"block {rcv_ids[0]} reason=spam")
|
|
check("7.1b block command OK", resp == "ok")
|
|
|
|
# Receive should fail with BLOCKED
|
|
r = recv_file(dd / "rcv1.xftp")
|
|
output = r.stdout + r.stderr
|
|
check("7.1c receive blocked file fails", r.returncode != 0)
|
|
check("7.1d error is BLOCKED (not AUTH)", "BLOCKED" in output and "AUTH" not in output)
|
|
|
|
stop_server()
|
|
|
|
# 7.2 Blocked file survives migration memory -> PG
|
|
print(" --- 7.2 blocked file survives memory->PG migration ---")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
enable_control_port()
|
|
assert start_server()
|
|
|
|
src = make_file("blockmigrate.bin")
|
|
send_file(src, n=2)
|
|
dd = desc_dir("blockmigrate.bin")
|
|
|
|
rcv_ids = get_recipient_ids(dd / "rcv1.xftp")
|
|
resp = control_cmd(f"block {rcv_ids[0]} reason=content")
|
|
check("7.2a block before migration", resp == "ok")
|
|
|
|
stop_server()
|
|
|
|
# Import to PG
|
|
enable_db_mode()
|
|
r = db_import()
|
|
check("7.2b import succeeded", r.returncode == 0)
|
|
|
|
ok = start_server("--confirm-migrations", "up")
|
|
check("7.2c PG server started", ok)
|
|
if ok:
|
|
r = recv_file(dd / "rcv1.xftp")
|
|
check("7.2d recv fails after migration", r.returncode != 0)
|
|
check("7.2e error is BLOCKED", "BLOCKED" in (r.stdout + r.stderr))
|
|
stop_server()
|
|
|
|
# 7.3 Blocked file survives migration PG -> memory
|
|
print(" --- 7.3 blocked file survives PG->memory export ---")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
enable_control_port()
|
|
enable_db_mode()
|
|
store_log = Path(os.environ["XFTP_SERVER_LOG_PATH"]) / "file-server-store.log"
|
|
store_log.unlink(missing_ok=True)
|
|
create_schema()
|
|
ok = start_server("--confirm-migrations", "up")
|
|
check("7.3a PG server started", ok)
|
|
if not ok:
|
|
return
|
|
|
|
src = make_file("blockpg.bin")
|
|
send_file(src, n=2)
|
|
dd = desc_dir("blockpg.bin")
|
|
|
|
rcv_ids = get_recipient_ids(dd / "rcv1.xftp")
|
|
resp = control_cmd(f"block {rcv_ids[0]} reason=spam")
|
|
check("7.3b block on PG", resp == "ok")
|
|
|
|
stop_server()
|
|
|
|
# Export to memory
|
|
r = db_export()
|
|
check("7.3c export succeeded", r.returncode == 0)
|
|
|
|
disable_db_mode()
|
|
ok = start_server()
|
|
check("7.3d memory server started", ok)
|
|
if ok:
|
|
r = recv_file(dd / "rcv1.xftp")
|
|
check("7.3e recv fails after PG->memory", r.returncode != 0)
|
|
check("7.3f error is BLOCKED", "BLOCKED" in (r.stdout + r.stderr))
|
|
stop_server()
|
|
|
|
|
|
def test_8_migration_edge_cases():
|
|
"""Edge cases in migration: acked recipients, deleted files, large files, double round-trip."""
|
|
global srv
|
|
print("\n=== 8. Migration edge cases ===")
|
|
|
|
# 8.1 Acked recipient fails after memory->PG migration
|
|
print(" --- 8.1 acked recipient fails after memory->PG ---")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
assert start_server()
|
|
|
|
src = make_file("acktest.bin")
|
|
send_file(src, n=2)
|
|
dd = desc_dir("acktest.bin")
|
|
|
|
# Copy rcv1 descriptor before recv (recv -y deletes it)
|
|
rcv1_backup = test_dir / "rcv1_acktest.xftp"
|
|
shutil.copy2(dd / "rcv1.xftp", rcv1_backup)
|
|
|
|
# Receive rcv1 (acknowledges it on server, deletes descriptor)
|
|
recv_file(dd / "rcv1.xftp")
|
|
check("8.1a rcv1 received", files_match(src, test_dir / "received/acktest.bin"))
|
|
|
|
stop_server()
|
|
|
|
# Migrate to PG
|
|
enable_db_mode()
|
|
r = db_import()
|
|
check("8.1b import succeeded", r.returncode == 0)
|
|
|
|
ok = start_server("--confirm-migrations", "up")
|
|
check("8.1c PG server started", ok)
|
|
if ok:
|
|
# Acked rcv1 should fail — recipient was removed by ack before migration
|
|
r = recv_file(rcv1_backup)
|
|
check("8.1d acked rcv1 fails after migration", r.returncode != 0)
|
|
check("8.1e error is AUTH", "AUTH" in (r.stdout + r.stderr))
|
|
|
|
# Unacked rcv2 should still work
|
|
(test_dir / "received/acktest.bin").unlink(missing_ok=True)
|
|
recv_file(dd / "rcv2.xftp")
|
|
check("8.1f rcv2 works after migration", files_match(src, test_dir / "received/acktest.bin"))
|
|
stop_server()
|
|
|
|
# 8.2 Acked recipient fails after PG->memory export
|
|
print(" --- 8.2 acked recipient fails after PG->memory ---")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
enable_db_mode()
|
|
store_log = Path(os.environ["XFTP_SERVER_LOG_PATH"]) / "file-server-store.log"
|
|
store_log.unlink(missing_ok=True)
|
|
create_schema()
|
|
ok = start_server("--confirm-migrations", "up")
|
|
check("8.2a PG server started", ok)
|
|
if not ok:
|
|
return
|
|
|
|
src = make_file("ackpg.bin")
|
|
send_file(src, n=2)
|
|
dd = desc_dir("ackpg.bin")
|
|
|
|
# Copy rcv1 descriptor before recv
|
|
rcv1_backup = test_dir / "rcv1_ackpg.xftp"
|
|
shutil.copy2(dd / "rcv1.xftp", rcv1_backup)
|
|
|
|
recv_file(dd / "rcv1.xftp")
|
|
check("8.2b rcv1 received on PG", files_match(src, test_dir / "received/ackpg.bin"))
|
|
|
|
stop_server()
|
|
|
|
r = db_export()
|
|
check("8.2c export succeeded", r.returncode == 0)
|
|
|
|
disable_db_mode()
|
|
ok = start_server()
|
|
check("8.2d memory server started", ok)
|
|
if ok:
|
|
# Acked rcv1 should fail
|
|
r = recv_file(rcv1_backup)
|
|
check("8.2e acked rcv1 fails after export", r.returncode != 0)
|
|
check("8.2f error is AUTH", "AUTH" in (r.stdout + r.stderr))
|
|
|
|
# Unacked rcv2 should work
|
|
(test_dir / "received/ackpg.bin").unlink(missing_ok=True)
|
|
recv_file(dd / "rcv2.xftp")
|
|
check("8.2g rcv2 works on memory after export", files_match(src, test_dir / "received/ackpg.bin"))
|
|
stop_server()
|
|
|
|
# 8.3 Deleted file absent after migration
|
|
print(" --- 8.3 deleted file absent after migration ---")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
assert start_server()
|
|
|
|
# Send a file that will be deleted before migration.
|
|
# Use n=2 so we have a rcv descriptor to test post-migration (rcv1 will be
|
|
# acked by the recv below; backup rcv2 before delete so we can try to recv
|
|
# it after migration — should return AUTH because the file was deleted).
|
|
srcDel = make_file("delmigrate.bin")
|
|
send_file(srcDel, n=2)
|
|
ddDel = desc_dir("delmigrate.bin")
|
|
# Backup rcv2 descriptor BEFORE delete (del doesn't touch rcv descriptors)
|
|
rcv2_del_backup = test_dir / "rcv2_delmigrate.xftp"
|
|
shutil.copy2(ddDel / "rcv2.xftp", rcv2_del_backup)
|
|
recv_file(ddDel / "rcv1.xftp")
|
|
del_file(ddDel / "snd.xftp.private")
|
|
|
|
# Send a positive control file that will NOT be deleted
|
|
srcKeep = make_file("keepmigrate.bin")
|
|
send_file(srcKeep, n=1)
|
|
check("8.3a keep file sent", (desc_dir("keepmigrate.bin") / "rcv1.xftp").exists())
|
|
|
|
stop_server()
|
|
|
|
enable_db_mode()
|
|
r = db_import()
|
|
check("8.3b import succeeded", r.returncode == 0)
|
|
|
|
# The kept file must be imported — proves import actually ran.
|
|
fc = db_count("files")
|
|
check(f"8.3c files imported ({fc})", fc > 0 and fc != -1)
|
|
|
|
ok = start_server("--confirm-migrations", "up")
|
|
check("8.3d PG server started", ok)
|
|
if ok:
|
|
# Positive control: kept file is receivable after migration
|
|
recv_file(desc_dir("keepmigrate.bin") / "rcv1.xftp")
|
|
check("8.3e kept file receivable after migration",
|
|
files_match(srcKeep, test_dir / "received/keepmigrate.bin"))
|
|
|
|
# Negative control: deleted file's rcv2 must return AUTH after migration
|
|
r = recv_file(rcv2_del_backup)
|
|
check("8.3f deleted file rcv2 fails after migration", r.returncode != 0)
|
|
check("8.3g error is AUTH (deleted file absent)",
|
|
"AUTH" in (r.stdout + r.stderr))
|
|
stop_server()
|
|
|
|
# 8.4 Large multi-chunk file integrity after migration
|
|
print(" --- 8.4 large file (multi-chunk) migration ---")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
assert start_server()
|
|
|
|
src = make_file("largefile.bin", size_mb=20)
|
|
send_file(src, n=1)
|
|
dd = desc_dir("largefile.bin")
|
|
check("8.4a large file sent", (dd / "rcv1.xftp").exists())
|
|
|
|
stop_server()
|
|
|
|
enable_db_mode()
|
|
r = db_import()
|
|
check("8.4b import succeeded", r.returncode == 0)
|
|
|
|
ok = start_server("--confirm-migrations", "up")
|
|
check("8.4c PG server started", ok)
|
|
if ok:
|
|
recv_file(dd / "rcv1.xftp")
|
|
check("8.4d large file integrity after migration",
|
|
files_match(src, test_dir / "received/largefile.bin"))
|
|
stop_server()
|
|
|
|
# 8.5 Double round-trip: memory -> PG -> memory, then receive
|
|
print(" --- 8.5 double round-trip migration ---")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
assert start_server()
|
|
|
|
src = make_file("roundtrip.bin")
|
|
send_file(src, n=1)
|
|
dd = desc_dir("roundtrip.bin")
|
|
|
|
stop_server()
|
|
|
|
# memory -> PG
|
|
enable_db_mode()
|
|
r = db_import()
|
|
check("8.5a first import (memory->PG)", r.returncode == 0)
|
|
|
|
ok = start_server("--confirm-migrations", "up")
|
|
check("8.5b PG server started", ok)
|
|
stop_server()
|
|
|
|
# PG -> memory
|
|
r = db_export()
|
|
check("8.5c first export (PG->memory)", r.returncode == 0)
|
|
|
|
disable_db_mode()
|
|
ok = start_server()
|
|
check("8.5d memory server started (round 1)", ok)
|
|
stop_server()
|
|
|
|
# memory -> PG again
|
|
clean_db()
|
|
enable_db_mode()
|
|
r = db_import()
|
|
check("8.5e second import (memory->PG)", r.returncode == 0)
|
|
|
|
ok = start_server("--confirm-migrations", "up")
|
|
check("8.5f PG server started (round 2)", ok)
|
|
if ok:
|
|
recv_file(dd / "rcv1.xftp")
|
|
check("8.5g file intact after double round-trip",
|
|
files_match(src, test_dir / "received/roundtrip.bin"))
|
|
stop_server()
|
|
|
|
|
|
def test_9_auth_and_access_control():
|
|
"""Basic auth, allowNewFiles, storage quota, file expiration."""
|
|
global srv
|
|
print("\n=== 9. Auth and access control ===")
|
|
|
|
# 9.1 AllowNewFiles=false rejects upload
|
|
print(" --- 9.1 allowNewFiles=false ---")
|
|
clean_test_dir()
|
|
srv = init_server()
|
|
ini_set("new_files", "off")
|
|
assert start_server()
|
|
|
|
src = make_file("reject.bin")
|
|
r = send_file(src)
|
|
check("9.1 upload rejected when new_files=off", r.returncode != 0)
|
|
stop_server()
|
|
|
|
# 9.2 Basic auth: no password → fails
|
|
print(" --- 9.2 basic auth: no password ---")
|
|
clean_test_dir()
|
|
srv = init_server()
|
|
ini_set("new_files", "on")
|
|
# Uncomment and set create_password
|
|
ini = ini_path()
|
|
txt = ini.read_text()
|
|
txt, n = re.subn(r"^# create_password:.*$", "create_password: secret123", txt, flags=re.MULTILINE)
|
|
assert n > 0, "create_password commented line not found in INI"
|
|
ini.write_text(txt)
|
|
assert start_server()
|
|
|
|
src = make_file("authtest.bin")
|
|
r = send_file(src)
|
|
check("9.2a upload without password fails", r.returncode != 0)
|
|
check("9.2b error is AUTH", "AUTH" in (r.stdout + r.stderr))
|
|
stop_server()
|
|
|
|
# 9.3 Basic auth: wrong password → fails
|
|
print(" --- 9.3 basic auth: wrong password ---")
|
|
# Reinit with password in server address
|
|
clean_test_dir()
|
|
srv = init_server()
|
|
ini_set("new_files", "on")
|
|
ini = ini_path()
|
|
txt = ini.read_text()
|
|
txt, n = re.subn(r"^# create_password:.*$", "create_password: secret123", txt, flags=re.MULTILINE)
|
|
assert n > 0, "create_password commented line not found in INI"
|
|
ini.write_text(txt)
|
|
fp = (Path(os.environ["XFTP_SERVER_CFG_PATH"]) / "fingerprint").read_text().strip()
|
|
wrong_srv = f"xftp://{fp}:wrongpass@127.0.0.1:{PORT}"
|
|
assert start_server()
|
|
|
|
src = make_file("authtest.bin")
|
|
r = run([XFTP, "send", str(src), str(test_dir / "descriptions"),
|
|
"-s", wrong_srv, "-n", "1"], check=False, timeout=30)
|
|
output = r.stdout + r.stderr
|
|
check("9.3a wrong password prints AUTH error", "PCEProtocolError AUTH" in output)
|
|
check("9.3b no descriptor created", not (desc_dir("authtest.bin") / "rcv1.xftp").exists())
|
|
stop_server()
|
|
|
|
# 9.4 Basic auth: correct password → succeeds
|
|
print(" --- 9.4 basic auth: correct password ---")
|
|
clean_test_dir()
|
|
srv = init_server()
|
|
ini_set("new_files", "on")
|
|
ini = ini_path()
|
|
txt = ini.read_text()
|
|
txt, n = re.subn(r"^# create_password:.*$", "create_password: secret123", txt, flags=re.MULTILINE)
|
|
assert n > 0, "create_password commented line not found in INI"
|
|
ini.write_text(txt)
|
|
fp = (Path(os.environ["XFTP_SERVER_CFG_PATH"]) / "fingerprint").read_text().strip()
|
|
correct_srv = f"xftp://{fp}:secret123@127.0.0.1:{PORT}"
|
|
assert start_server()
|
|
|
|
src = make_file("authok.bin")
|
|
r = run([XFTP, "send", str(src), str(test_dir / "descriptions"),
|
|
"-s", correct_srv, "-n", "1"], check=False, timeout=60)
|
|
check("9.4 upload with correct password succeeds", r.returncode == 0)
|
|
stop_server()
|
|
|
|
# 9.5 Server no auth, client sends auth → succeeds
|
|
print(" --- 9.5 no server auth, client sends auth ---")
|
|
clean_test_dir()
|
|
srv = init_server()
|
|
fp = (Path(os.environ["XFTP_SERVER_CFG_PATH"]) / "fingerprint").read_text().strip()
|
|
auth_srv = f"xftp://{fp}:anypass@127.0.0.1:{PORT}"
|
|
assert start_server()
|
|
|
|
src = make_file("noauth.bin")
|
|
r = run([XFTP, "send", str(src), str(test_dir / "descriptions"),
|
|
"-s", auth_srv, "-n", "1"], check=False, timeout=60)
|
|
check("9.5 upload with auth to no-auth server succeeds", r.returncode == 0)
|
|
stop_server()
|
|
|
|
# 9.6 Storage quota: exact boundary
|
|
print(" --- 9.6 storage quota boundary ---")
|
|
clean_test_dir()
|
|
# Chunk size is 128KB, so 1MB file = ~8 chunks but stored as one padded chunk per server file
|
|
# Use small quota: allow exactly 2 files of 1MB
|
|
srv = init_server(quota="3mb")
|
|
assert start_server()
|
|
|
|
src1 = make_file("quota1.bin")
|
|
r1 = send_file(src1)
|
|
check("9.6a first file within quota", r1.returncode == 0)
|
|
|
|
src2 = make_file("quota2.bin")
|
|
r2 = send_file(src2)
|
|
check("9.6b second file within quota", r2.returncode == 0)
|
|
|
|
src3 = make_file("quota3.bin")
|
|
r3 = send_file(src3)
|
|
check("9.6c third file rejected", r3.returncode != 0)
|
|
check("9.6d error is QUOTA", "QUOTA" in (r3.stdout + r3.stderr))
|
|
stop_server()
|
|
|
|
# 9.7 File expiration
|
|
# Note: createdAt uses hour-level precision (fileTimePrecision = 3600s).
|
|
# With expire_files_hours=0, TTL=0, and the check is createdAt + TTL < now.
|
|
# Files created in the current hour have createdAt = now (rounded), so
|
|
# createdAt + 0 is NOT < now — they won't expire until the next hour.
|
|
# The check interval is hardcoded at 2 hours and not configurable via INI.
|
|
# This makes expiration untestable in a fast automated test.
|
|
# File expiration IS tested in the Haskell test suite (testFileChunkExpiration)
|
|
# with a 1-second TTL and 1-second check interval configured programmatically.
|
|
print(" --- 9.7 file expiration (skipped: requires hour boundary, tested in Haskell suite) ---")
|
|
|
|
|
|
def test_10_control_port_operations():
|
|
"""Control port: delete, auth failure, invalid commands, stats."""
|
|
global srv
|
|
print("\n=== 10. Control port operations ===")
|
|
|
|
clean_test_dir()
|
|
srv = init_server()
|
|
enable_control_port()
|
|
assert start_server()
|
|
|
|
# 10.1 Control port: command without authentication
|
|
# Server should respond with "AUTH" when no auth has been provided
|
|
print(" --- 10.1 no auth ---")
|
|
src = make_file("ctrldel.bin")
|
|
send_file(src, n=1)
|
|
dd = desc_dir("ctrldel.bin")
|
|
rcv_ids = get_recipient_ids(dd / "rcv1.xftp")
|
|
resp = control_cmd(f"delete {rcv_ids[0]}", auth=False)
|
|
check("10.1 command without auth returns AUTH", resp == "AUTH")
|
|
|
|
# 10.2 Control port: wrong password assigns CPRNone, commands return AUTH
|
|
print(" --- 10.2 wrong password ---")
|
|
s = control_connect()
|
|
auth_resp = control_send_recv(s, "auth wrongpassword")
|
|
check("10.2a wrong password gives CPRNone", auth_resp == "Current role is CPRNone")
|
|
cmd_resp = control_send_recv(s, f"delete {rcv_ids[0]}")
|
|
check("10.2b CPRNone command returns AUTH", cmd_resp == "AUTH")
|
|
s.sendall(b"quit\n")
|
|
s.close()
|
|
|
|
# 10.3 Control port: stats-rts
|
|
# Without +RTS -T, returns "unsupported operation (GHC.Stats.getRTSStats: ...)"
|
|
# With +RTS -T, returns actual GHC runtime stats with "gcs" field etc.
|
|
# Either is a valid non-error response.
|
|
print(" --- 10.3 stats-rts ---")
|
|
resp = control_cmd("stats-rts")
|
|
check("10.3 stats-rts responds",
|
|
"getRTSStats" in resp or "gcs" in resp or "allocated_bytes" in resp)
|
|
|
|
# 10.4 Control port: delete command removes file
|
|
print(" --- 10.4 control port delete ---")
|
|
resp = control_cmd(f"delete {rcv_ids[0]}")
|
|
check("10.4a delete command returns ok", resp == "ok")
|
|
|
|
r = recv_file(dd / "rcv1.xftp")
|
|
check("10.4b recv after control port delete fails", r.returncode != 0)
|
|
check("10.4c error is AUTH", "AUTH" in (r.stdout + r.stderr))
|
|
|
|
# 10.5 Control port: invalid block reason
|
|
print(" --- 10.5 invalid block reason ---")
|
|
src2 = make_file("badblock.bin")
|
|
send_file(src2, n=1)
|
|
dd2 = desc_dir("badblock.bin")
|
|
rcv_ids2 = get_recipient_ids(dd2 / "rcv1.xftp")
|
|
|
|
resp = control_cmd(f"block {rcv_ids2[0]} reason=invalid_reason")
|
|
check("10.5 invalid block reason returns error", resp.startswith("error:"))
|
|
|
|
stop_server()
|
|
|
|
|
|
def test_11_blocked_file_sender_delete():
|
|
"""Blocked file: sender cannot delete it."""
|
|
global srv
|
|
print("\n=== 11. Blocked file: sender delete attempt ===")
|
|
|
|
clean_test_dir()
|
|
srv = init_server()
|
|
enable_control_port()
|
|
assert start_server()
|
|
|
|
src = make_file("blockdel.bin")
|
|
send_file(src, n=1)
|
|
dd = desc_dir("blockdel.bin")
|
|
|
|
rcv_ids = get_recipient_ids(dd / "rcv1.xftp")
|
|
resp = control_cmd(f"block {rcv_ids[0]} reason=spam")
|
|
check("11.1 block succeeded", resp == "ok")
|
|
|
|
# Sender tries to delete — should fail with BLOCKED
|
|
r = del_file(dd / "snd.xftp.private")
|
|
check("11.2 sender delete of blocked file fails", r.returncode != 0)
|
|
check("11.3 error mentions BLOCKED",
|
|
"BLOCKED" in (r.stdout + r.stderr))
|
|
|
|
stop_server()
|
|
|
|
|
|
def test_12_recipient_cascade_and_storage():
|
|
"""Recipient cascade delete and storage accounting."""
|
|
global srv
|
|
print("\n=== 12. Recipient cascade and storage accounting ===")
|
|
|
|
# 12.1 Recipient cascade: delete file, all recipients gone
|
|
print(" --- 12.1 recipient cascade delete (PG) ---")
|
|
clean_test_dir()
|
|
clean_db()
|
|
srv = init_server()
|
|
enable_db_mode()
|
|
store_log = Path(os.environ["XFTP_SERVER_LOG_PATH"]) / "file-server-store.log"
|
|
store_log.unlink(missing_ok=True)
|
|
create_schema()
|
|
ok = start_server("--confirm-migrations", "up")
|
|
check("12.1a PG server started", ok)
|
|
if not ok:
|
|
return
|
|
|
|
src = make_file("cascade.bin")
|
|
send_file(src, n=3)
|
|
|
|
fc_before = db_count("files")
|
|
rc_before = db_count("recipients")
|
|
check(f"12.1b files before delete ({fc_before})", fc_before > 0)
|
|
check(f"12.1c recipients before delete ({rc_before})", rc_before > 0)
|
|
|
|
del_file(desc_dir("cascade.bin") / "snd.xftp.private")
|
|
|
|
fc_after = db_count("files")
|
|
rc_after = db_count("recipients")
|
|
check(f"12.1d files after delete ({fc_after})", fc_after == 0)
|
|
check(f"12.1e recipients cascade deleted ({rc_after})", rc_after == 0)
|
|
|
|
# 12.2 Storage accounting: upload, delete, verify disk
|
|
print(" --- 12.2 storage accounting ---")
|
|
src1 = make_file("stor1.bin")
|
|
r1 = send_file(src1)
|
|
check("12.2a stor1 upload succeeded", r1.returncode == 0)
|
|
src2 = make_file("stor2.bin")
|
|
r2 = send_file(src2)
|
|
check("12.2b stor2 upload succeeded", r2.returncode == 0)
|
|
|
|
files_on_disk = len(list((test_dir / "files").iterdir()))
|
|
check(f"12.2c files on disk after upload ({files_on_disk})", files_on_disk > 0)
|
|
|
|
del_file(desc_dir("stor1.bin") / "snd.xftp.private")
|
|
del_file(desc_dir("stor2.bin") / "snd.xftp.private")
|
|
|
|
files_on_disk = len(list((test_dir / "files").iterdir()))
|
|
check(f"12.2d files on disk after delete ({files_on_disk})", files_on_disk == 0)
|
|
|
|
fc = db_count("files")
|
|
check(f"12.2e DB files after delete ({fc})", fc == 0)
|
|
|
|
stop_server()
|
|
|
|
|
|
# ===================================================================
|
|
# Main
|
|
# ===================================================================
|
|
|
|
if __name__ == "__main__":
|
|
XFTP_SERVER = cabal_bin("xftp-server")
|
|
XFTP = cabal_bin("xftp")
|
|
test_dir = Path.cwd() / "xftp-test"
|
|
|
|
os.environ["XFTP_SERVER_CFG_PATH"] = str(test_dir / "etc")
|
|
os.environ["XFTP_SERVER_LOG_PATH"] = str(test_dir / "var")
|
|
|
|
srv = ""
|
|
|
|
print(f"XFTP server: {XFTP_SERVER}")
|
|
print(f"XFTP client: {XFTP}")
|
|
print(f"Test dir: {test_dir}")
|
|
print(f"PGHOST: {os.environ.get('PGHOST', '(default)')}")
|
|
|
|
# Verify prerequisites
|
|
r = psql("SELECT 1;", check=False)
|
|
if r.returncode != 0:
|
|
sys.exit(f"Cannot connect to PostgreSQL as {PG_ADMIN_USER}. Is it running?")
|
|
r = psql("SELECT 1;", user=DB_USER, db="postgres", check=False)
|
|
if r.returncode != 0:
|
|
sys.exit(f"PostgreSQL user '{DB_USER}' does not exist.\n"
|
|
f"Run: psql -U {PG_ADMIN_USER} -c \"CREATE USER {DB_USER} WITH SUPERUSER;\"")
|
|
|
|
try:
|
|
test_1_basic_memory()
|
|
test_2_basic_postgres()
|
|
test_3_migration_memory_to_pg()
|
|
test_4_migration_pg_to_memory() # continues from test_3 state
|
|
test_4b_send_pg_receive_memory()
|
|
test_5_restart_persistence()
|
|
test_6_edge_cases()
|
|
test_7_blocking()
|
|
test_8_migration_edge_cases()
|
|
test_9_auth_and_access_control()
|
|
test_10_control_port_operations()
|
|
test_11_blocked_file_sender_delete()
|
|
test_12_recipient_cascade_and_storage()
|
|
except Exception:
|
|
stop_server()
|
|
print("\n [ERROR] Unexpected exception:")
|
|
traceback.print_exc()
|
|
FAIL += 1
|
|
finally:
|
|
stop_server()
|
|
# Cleanup
|
|
if test_dir.exists():
|
|
shutil.rmtree(test_dir)
|
|
psql(f"DROP DATABASE IF EXISTS {DB_NAME};", check=False)
|
|
|
|
print(f"\n{'=' * 42}")
|
|
print(f"Results: {PASS} passed, {FAIL} failed")
|
|
print(f"{'=' * 42}")
|
|
sys.exit(1 if FAIL > 0 else 0)
|