import Fastify from 'fastify'; import http from 'http'; import { migrate, query } from './db.js'; import { matchInbound } from './matcher.js'; import { confirmAndDispatch } from './dispatcher.js'; import { createSubscription, simulateNotification } from './bci.js'; import { newId, genRefCode, parsePesos } from './util.js'; const PORT = parseInt(process.env.PORT ?? '3500', 10); const ADMIN_TOKEN = process.env.KUA_CASHIER_ADMIN_TOKEN ?? ''; const PUBLIC_URL = process.env.KUA_CASHIER_PUBLIC_URL ?? ''; // e.g. https://cashier.kua.cl const ALLOWED_NODES = new Set((process.env.KUA_ALLOWED_NODES ?? 'gal,bruno,genesis').split(',').map(s => s.trim())); const TAILSCALE_SOCKET = '/var/run/tailscale/tailscaled.sock'; const DEV = process.env.NODE_ENV !== 'production'; // ─── BCI config (from env / kua-vault) ────────────────────────────────────── const BCI_CFG = { subscriptionKey: process.env.BCI_SUBSCRIPTION_KEY ?? '', webhookSecret: process.env.BCI_WEBHOOK_SECRET ?? '', organizationName: process.env.BCI_ORGANIZATION_NAME ?? '', accountNumber: process.env.BCI_ACCOUNT_NUMBER ?? '', rut: process.env.BCI_ACCOUNT_RUT ?? '', checkDigit: process.env.BCI_ACCOUNT_CHECK_DIGIT ?? '', }; // ─── Fastify ───────────────────────────────────────────────────────────────── const fastify = Fastify({ logger: true }); // ─── Tailscale whois ───────────────────────────────────────────────────────── async function tailscaleWhois(remoteAddr) { return new Promise((resolve) => { const timeout = setTimeout(() => resolve(null), 2000); const req = http.request( { socketPath: TAILSCALE_SOCKET, path: `/localapi/v0/whois?addr=${encodeURIComponent(remoteAddr)}`, method: 'GET' }, (res) => { let data = ''; res.on('data', c => { data += c; }); res.on('end', () => { clearTimeout(timeout); try { const p = JSON.parse(data); resolve(p.Node ? { hostname: p.Node.ComputedName ?? '', tags: p.Node.Tags ?? [], user: p.UserProfile?.LoginName ?? '', } : null); } catch { resolve(null); } }); } ); req.on('error', () => { clearTimeout(timeout); resolve(null); }); req.end(); }); } function isAllowedNode(id) { if (!id) return false; if (id.tags?.includes('tag:admin')) return true; return ALLOWED_NODES.has(id.hostname); } // ─── Auth hook ─────────────────────────────────────────────────────────────── // Public paths: /health, /bci/notify (has its own APIKey check) // Everything else: Tailscale whois OR Bearer token fastify.addHook('onRequest', async (req, reply) => { const { url } = req; if (url === '/health' || url.startsWith('/bci/notify')) return; // Bearer token const auth = req.headers.authorization; if (auth?.startsWith('Bearer ') && ADMIN_TOKEN && auth.slice(7) === ADMIN_TOKEN) return; // Tailscale whois const isLocal = ['127.0.0.1', '::1', '::ffff:127.0.0.1'].includes(req.ip) || req.ip.startsWith('172.'); if (isLocal) { req.identity = { hostname: 'bruno', tags: ['tag:admin'] }; return; } if (!DEV) { const id = await tailscaleWhois(`${req.ip}:${req.socket.remotePort ?? 0}`); if (!id || !isAllowedNode(id)) { return reply.code(403).send({ error: 'not authorized' }); } req.identity = id; } }); // ─── Routes: health ─────────────────────────────────────────────────────────── fastify.get('/health', async () => ({ ok: true, service: 'kua-cashier' })); // ─── Routes: BCI webhook ────────────────────────────────────────────────────── // BCI payload fields (discovered from sandbox): // idEvento, idContable, monto, fechaTransaccion, fechaNotificacion, // rutCliente, dvCliente, tipoTransaccion, anulacion, numeroReintento, // concepto.{ nombre, rut, dv, numeroCuenta, codigoBanco, nroOperacion, mensaje } // // Auth: secret embedded as ?token=... in the callback URL we register with BCI. // BCI does NOT echo APIKey back in the webhook body. fastify.post('/bci/notify', async (req, reply) => { const raw = req.body ?? {}; fastify.log.info({ raw }, 'bci webhook received'); // Verify token embedded in callback URL query string const token = req.query?.token ?? ''; if (BCI_CFG.webhookSecret && token !== BCI_CFG.webhookSecret) { fastify.log.warn('bci webhook: token mismatch'); return reply.code(401).send({ error: 'invalid token' }); } // Skip reversals if (raw.anulacion === true) { return reply.send({ ok: true, skipped: 'anulacion' }); } // monto is an integer in BCI's payload const amount = typeof raw.monto === 'number' ? raw.monto : parsePesos(raw.monto); if (!amount || amount <= 0) { return reply.code(400).send({ error: 'missing or invalid monto' }); } // Sender RUT: rutCliente + dvCliente → "88777666-0" const senderRut = raw.rutCliente ? `${raw.rutCliente}-${raw.dvCliente ?? ''}`.replace(/-$/, '') : null; // Reference code lives in concepto.mensaje (customer fills in transfer description) const description = raw.concepto?.mensaje ?? null; const bankRef = raw.idEvento ?? null; const receivedAt = raw.fechaTransaccion ?? raw.fechaNotificacion ?? new Date().toISOString(); const { rows: [account] } = await query( `SELECT id FROM accounts WHERE bank = 'bci' AND active = true LIMIT 1` ); const id = newId(); try { await query( `INSERT INTO inbound_payments (id, source, account_id, amount, sender_rut, description, received_at, bank_ref, raw) VALUES ($1, 'bci_webhook', $2, $3, $4, $5, $6, $7, $8)`, [id, account?.id ?? null, amount, senderRut, description, receivedAt, bankRef, JSON.stringify(raw)] ); } catch (err) { if (err.code === '23505') { fastify.log.info({ bankRef }, 'bci webhook: duplicate, skipping'); return reply.send({ ok: true, skipped: 'duplicate' }); } throw err; } const { rows: [inbound] } = await query(`SELECT * FROM inbound_payments WHERE id = $1`, [id]); await runMatcher(inbound); // BCI expects this specific response shape (S003 sandbox warning goes away with it) return reply.send({ Data: { Status: 'RECIBIDO' }, Links: {}, Meta: {} }); }); // ─── Routes: generic inbound (bank-parser, email, etc.) ─────────────────────── fastify.post('/inbound/notify', async (req, reply) => { const body = req.body ?? {}; const amount = parsePesos(body.amount ?? body.amountMinor); if (!amount || amount <= 0) return reply.code(400).send({ error: 'amount required' }); // Resolve account_id from bank + account_number if provided let accountId = body.account_id ?? null; if (!accountId && body.bank && body.account_number) { const { rows: [acc] } = await query( `SELECT id FROM accounts WHERE bank = $1 AND account_number = $2`, [body.bank, body.account_number] ); accountId = acc?.id ?? null; } const bankRef = body.bank_ref ?? body.bankRef ?? null; const id = newId(); try { await query( `INSERT INTO inbound_payments (id, source, account_id, amount, sender_rut, description, received_at, bank_ref, raw) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`, [ id, body.source ?? 'bank_parser', accountId, amount, body.sender_rut ?? body.senderRut ?? null, body.description ?? null, body.received_at ?? body.receivedAt ?? body.postedAt ?? new Date().toISOString(), bankRef, JSON.stringify(body), ] ); } catch (err) { if (err.code === '23505') return reply.send({ ok: true, skipped: 'duplicate' }); throw err; } const { rows: [inbound] } = await query(`SELECT * FROM inbound_payments WHERE id = $1`, [id]); await runMatcher(inbound); return reply.send({ ok: true, inbound_id: id }); }); // ─── Routes: payment intents ────────────────────────────────────────────────── fastify.post('/intent', async (req, reply) => { const { source_app, account_id, nominal_amount, sender_rut, callback_url, callback_secret, expires_in = 86400, metadata } = req.body ?? {}; if (!source_app) return reply.code(400).send({ error: 'source_app required' }); if (!account_id) return reply.code(400).send({ error: 'account_id required' }); if (!nominal_amount || nominal_amount <= 0) return reply.code(400).send({ error: 'nominal_amount required' }); if (!callback_url) return reply.code(400).send({ error: 'callback_url required' }); const { rows: [account] } = await query(`SELECT * FROM accounts WHERE id = $1 AND active = true`, [account_id]); if (!account) return reply.code(404).send({ error: 'account not found' }); // Find lowest free slot: try nominal_amount-1, nominal_amount-2, ... let exactAmount = null; let jitter = 0; for (let j = 0; j <= 50; j++) { const candidate = nominal_amount - j; if (candidate <= 0) break; const { rows } = await query( `SELECT 1 FROM payment_intents WHERE exact_amount = $1 AND account_id = $2 AND status = 'pending'`, [candidate, account_id] ); if (rows.length === 0) { exactAmount = candidate; jitter = j; break; } } if (exactAmount === null) return reply.code(409).send({ error: 'no available amount slot — too many concurrent intents at this amount' }); // Generate unique reference code let refCode; for (let i = 0; i < 10; i++) { const candidate = genRefCode(); const { rows } = await query(`SELECT 1 FROM payment_intents WHERE reference_code = $1`, [candidate]); if (rows.length === 0) { refCode = candidate; break; } } if (!refCode) return reply.code(500).send({ error: 'could not generate unique reference code' }); const id = newId(); const expiresAt = new Date(Date.now() + expires_in * 1000).toISOString(); await query( `INSERT INTO payment_intents (id, source_app, account_id, nominal_amount, exact_amount, jitter, sender_rut, reference_code, callback_url, callback_secret, metadata, expires_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)`, [id, source_app, account_id, nominal_amount, exactAmount, jitter, sender_rut ?? null, refCode, callback_url, callback_secret ?? null, metadata ? JSON.stringify(metadata) : null, expiresAt] ); return reply.code(201).send({ intent_id: id, exact_amount: exactAmount, jitter, reference_code: refCode, account: { bank: account.bank, account_number: account.account_number, owner_name: account.owner_name, owner_rut: account.owner_rut, }, expires_at: expiresAt, }); }); fastify.get('/intent/:id', async (req, reply) => { const { rows: [intent] } = await query(`SELECT * FROM payment_intents WHERE id = $1`, [req.params.id]); if (!intent) return reply.code(404).send({ error: 'not found' }); return reply.send(intent); }); fastify.delete('/intent/:id', async (req, reply) => { const { rows: [intent] } = await query(`SELECT id, status FROM payment_intents WHERE id = $1`, [req.params.id]); if (!intent) return reply.code(404).send({ error: 'not found' }); if (intent.status === 'matched') return reply.code(409).send({ error: 'already matched' }); await query(`UPDATE payment_intents SET status = 'cancelled' WHERE id = $1`, [intent.id]); return reply.send({ ok: true }); }); // ─── Routes: admin ──────────────────────────────────────────────────────────── // List unaccounted inbound payments fastify.get('/admin/inbound', async (req) => { const limit = Math.min(parseInt(req.query.limit ?? '50', 10), 200); const { rows } = await query( `SELECT * FROM inbound_payments WHERE status = 'unaccounted' ORDER BY detected_at DESC LIMIT $1`, [limit] ); return { unaccounted: rows, count: rows.length }; }); // Manually trigger a match sweep over all unaccounted inbounds fastify.post('/admin/match', async (_req, reply) => { const { rows } = await query(`SELECT * FROM inbound_payments WHERE status = 'unaccounted' ORDER BY detected_at ASC`); let matched = 0; for (const inbound of rows) { const result = await matchInbound(inbound); if (result) { await confirmAndDispatch(result.intent, inbound, result.confidence); matched++; } } return reply.send({ swept: rows.length, matched }); }); // Expire stale intents fastify.post('/admin/expire', async (_req, reply) => { const { rowCount } = await query( `UPDATE payment_intents SET status = 'expired' WHERE status = 'pending' AND expires_at < now()` ); return reply.send({ expired: rowCount }); }); // List all accounts fastify.get('/admin/accounts', async () => { const { rows } = await query(`SELECT id, bank, account_number, owner_name, owner_rut, active, bci_subscription_id FROM accounts ORDER BY created_at`); return { accounts: rows }; }); // Register a new account fastify.post('/admin/accounts', async (req, reply) => { const { bank, account_number, owner_name, owner_rut, check_digit, api_key, webhook_secret } = req.body ?? {}; if (!bank || !account_number || !owner_name || !owner_rut) { return reply.code(400).send({ error: 'bank, account_number, owner_name, owner_rut required' }); } const id = newId(); await query( `INSERT INTO accounts (id, bank, account_number, owner_name, owner_rut, check_digit, api_key, webhook_secret) VALUES ($1,$2,$3,$4,$5,$6,$7,$8) ON CONFLICT (bank, account_number) DO UPDATE SET owner_name = EXCLUDED.owner_name, owner_rut = EXCLUDED.owner_rut, check_digit = EXCLUDED.check_digit, api_key = EXCLUDED.api_key, webhook_secret = EXCLUDED.webhook_secret`, [id, bank, account_number, owner_name, owner_rut, check_digit ?? null, api_key ?? null, webhook_secret ?? null] ); const { rows: [acc] } = await query(`SELECT * FROM accounts WHERE bank = $1 AND account_number = $2`, [bank, account_number]); return reply.code(201).send(acc); }); // Register BCI webhook subscription for an account fastify.post('/admin/bci/subscribe', async (req, reply) => { const { account_id } = req.body ?? {}; const { rows: [acc] } = await query(`SELECT * FROM accounts WHERE id = $1`, [account_id ?? '']); if (!acc) return reply.code(404).send({ error: 'account not found' }); const callbackUrl = `${PUBLIC_URL}/bci/notify`; const result = await createSubscription({ organizationName: acc.owner_name, account: acc.account_number, rut: acc.owner_rut, checkDigit: acc.check_digit, callbackUrl, subscriptionKey: acc.api_key ?? BCI_CFG.subscriptionKey, webhookSecret: acc.webhook_secret ?? BCI_CFG.webhookSecret, }); await query(`UPDATE accounts SET bci_subscription_id = $1 WHERE id = $2`, [JSON.stringify(result), acc.id]); return reply.send({ ok: true, subscription: result, callback_url: callbackUrl }); }); // Fire a test BCI notification (discovers real payload shape) fastify.post('/admin/bci/simulate', async (req, reply) => { const { account_id, amount = 1000 } = req.body ?? {}; const { rows: [acc] } = await query(`SELECT * FROM accounts WHERE id = $1`, [account_id ?? '']); if (!acc) return reply.code(404).send({ error: 'account not found' }); const result = await simulateNotification({ callbackUrl: `${PUBLIC_URL}/bci/notify`, amount, subscriptionKey: acc.api_key ?? BCI_CFG.subscriptionKey, webhookSecret: acc.webhook_secret ?? BCI_CFG.webhookSecret, }); return reply.send({ ok: true, result }); }); // ─── Matcher helper ─────────────────────────────────────────────────────────── async function runMatcher(inbound) { const result = await matchInbound(inbound); if (result) { fastify.log.info({ intent_id: result.intent.id, inbound_id: inbound.id, confidence: result.confidence }, 'match found'); await confirmAndDispatch(result.intent, inbound, result.confidence); } } // ─── Periodic sweep ─────────────────────────────────────────────────────────── // Re-attempt unaccounted inbounds every 5 minutes in case a pending intent // was registered after the inbound arrived (race condition). function startSweepCron() { setInterval(async () => { try { // Expire stale intents first await query(`UPDATE payment_intents SET status = 'expired' WHERE status = 'pending' AND expires_at < now()`); const { rows } = await query(`SELECT * FROM inbound_payments WHERE status = 'unaccounted' ORDER BY detected_at ASC LIMIT 100`); for (const inbound of rows) { const result = await matchInbound(inbound); if (result) await confirmAndDispatch(result.intent, inbound, result.confidence); } } catch (err) { fastify.log.error({ err }, 'sweep cron error'); } }, 5 * 60 * 1000); } // ─── Start ──────────────────────────────────────────────────────────────────── try { await migrate(); fastify.log.info('database migrated'); await fastify.listen({ port: PORT, host: '0.0.0.0' }); startSweepCron(); fastify.log.info(`kua-cashier listening on ${PORT}`); } catch (err) { fastify.log.error(err); process.exit(1); }