kua-cashier/server.js

430 lines
18 KiB
JavaScript

import Fastify from 'fastify';
import http from 'http';
import { migrate, query } from './db.js';
import { matchInbound } from './matcher.js';
import { confirmAndDispatch } from './dispatcher.js';
import { createSubscription, simulateNotification } from './bci.js';
import { newId, genRefCode, parsePesos } from './util.js';
const PORT = parseInt(process.env.PORT ?? '3500', 10);
const ADMIN_TOKEN = process.env.KUA_CASHIER_ADMIN_TOKEN ?? '';
const PUBLIC_URL = process.env.KUA_CASHIER_PUBLIC_URL ?? ''; // e.g. https://cashier.kua.cl
const ALLOWED_NODES = new Set((process.env.KUA_ALLOWED_NODES ?? 'gal,bruno,genesis').split(',').map(s => s.trim()));
const TAILSCALE_SOCKET = '/var/run/tailscale/tailscaled.sock';
const DEV = process.env.NODE_ENV !== 'production';
// ─── BCI config (from env / kua-vault) ──────────────────────────────────────
const BCI_CFG = {
subscriptionKey: process.env.BCI_SUBSCRIPTION_KEY ?? '',
webhookSecret: process.env.BCI_WEBHOOK_SECRET ?? '',
organizationName: process.env.BCI_ORGANIZATION_NAME ?? '',
accountNumber: process.env.BCI_ACCOUNT_NUMBER ?? '',
rut: process.env.BCI_ACCOUNT_RUT ?? '',
checkDigit: process.env.BCI_ACCOUNT_CHECK_DIGIT ?? '',
};
// ─── Fastify ─────────────────────────────────────────────────────────────────
const fastify = Fastify({ logger: true });
// ─── Tailscale whois ─────────────────────────────────────────────────────────
async function tailscaleWhois(remoteAddr) {
return new Promise((resolve) => {
const timeout = setTimeout(() => resolve(null), 2000);
const req = http.request(
{ socketPath: TAILSCALE_SOCKET, path: `/localapi/v0/whois?addr=${encodeURIComponent(remoteAddr)}`, method: 'GET' },
(res) => {
let data = '';
res.on('data', c => { data += c; });
res.on('end', () => {
clearTimeout(timeout);
try {
const p = JSON.parse(data);
resolve(p.Node ? {
hostname: p.Node.ComputedName ?? '',
tags: p.Node.Tags ?? [],
user: p.UserProfile?.LoginName ?? '',
} : null);
} catch { resolve(null); }
});
}
);
req.on('error', () => { clearTimeout(timeout); resolve(null); });
req.end();
});
}
function isAllowedNode(id) {
if (!id) return false;
if (id.tags?.includes('tag:admin')) return true;
return ALLOWED_NODES.has(id.hostname);
}
// ─── Auth hook ───────────────────────────────────────────────────────────────
// Public paths: /health, /bci/notify (has its own APIKey check)
// Everything else: Tailscale whois OR Bearer token
fastify.addHook('onRequest', async (req, reply) => {
const { url } = req;
if (url === '/health' || url.startsWith('/bci/notify')) return;
// Bearer token
const auth = req.headers.authorization;
if (auth?.startsWith('Bearer ') && ADMIN_TOKEN && auth.slice(7) === ADMIN_TOKEN) return;
// Tailscale whois
const isLocal = ['127.0.0.1', '::1', '::ffff:127.0.0.1'].includes(req.ip) || req.ip.startsWith('172.');
if (isLocal) { req.identity = { hostname: 'bruno', tags: ['tag:admin'] }; return; }
if (!DEV) {
const id = await tailscaleWhois(`${req.ip}:${req.socket.remotePort ?? 0}`);
if (!id || !isAllowedNode(id)) {
return reply.code(403).send({ error: 'not authorized' });
}
req.identity = id;
}
});
// ─── Routes: health ───────────────────────────────────────────────────────────
fastify.get('/health', async () => ({ ok: true, service: 'kua-cashier' }));
// ─── Routes: BCI webhook ──────────────────────────────────────────────────────
// BCI payload fields (discovered from sandbox):
// idEvento, idContable, monto, fechaTransaccion, fechaNotificacion,
// rutCliente, dvCliente, tipoTransaccion, anulacion, numeroReintento,
// concepto.{ nombre, rut, dv, numeroCuenta, codigoBanco, nroOperacion, mensaje }
//
// Auth: secret embedded as ?token=... in the callback URL we register with BCI.
// BCI does NOT echo APIKey back in the webhook body.
fastify.post('/bci/notify', async (req, reply) => {
const raw = req.body ?? {};
fastify.log.info({ raw }, 'bci webhook received');
// Verify token embedded in callback URL query string
const token = req.query?.token ?? '';
if (BCI_CFG.webhookSecret && token !== BCI_CFG.webhookSecret) {
fastify.log.warn('bci webhook: token mismatch');
return reply.code(401).send({ error: 'invalid token' });
}
// Skip reversals
if (raw.anulacion === true) {
return reply.send({ ok: true, skipped: 'anulacion' });
}
// monto is an integer in BCI's payload
const amount = typeof raw.monto === 'number' ? raw.monto : parsePesos(raw.monto);
if (!amount || amount <= 0) {
return reply.code(400).send({ error: 'missing or invalid monto' });
}
// Sender RUT: rutCliente + dvCliente → "88777666-0"
const senderRut = raw.rutCliente
? `${raw.rutCliente}-${raw.dvCliente ?? ''}`.replace(/-$/, '')
: null;
// Reference code lives in concepto.mensaje (customer fills in transfer description)
const description = raw.concepto?.mensaje ?? null;
const bankRef = raw.idEvento ?? null;
const receivedAt = raw.fechaTransaccion ?? raw.fechaNotificacion ?? new Date().toISOString();
const { rows: [account] } = await query(
`SELECT id FROM accounts WHERE bank = 'bci' AND active = true LIMIT 1`
);
const id = newId();
try {
await query(
`INSERT INTO inbound_payments
(id, source, account_id, amount, sender_rut, description, received_at, bank_ref, raw)
VALUES ($1, 'bci_webhook', $2, $3, $4, $5, $6, $7, $8)`,
[id, account?.id ?? null, amount, senderRut, description, receivedAt, bankRef, JSON.stringify(raw)]
);
} catch (err) {
if (err.code === '23505') {
fastify.log.info({ bankRef }, 'bci webhook: duplicate, skipping');
return reply.send({ ok: true, skipped: 'duplicate' });
}
throw err;
}
const { rows: [inbound] } = await query(`SELECT * FROM inbound_payments WHERE id = $1`, [id]);
await runMatcher(inbound);
// BCI expects this specific response shape (S003 sandbox warning goes away with it)
return reply.send({ Data: { Status: 'RECIBIDO' }, Links: {}, Meta: {} });
});
// ─── Routes: generic inbound (bank-parser, email, etc.) ───────────────────────
fastify.post('/inbound/notify', async (req, reply) => {
const body = req.body ?? {};
const amount = parsePesos(body.amount ?? body.amountMinor);
if (!amount || amount <= 0) return reply.code(400).send({ error: 'amount required' });
// Resolve account_id from bank + account_number if provided
let accountId = body.account_id ?? null;
if (!accountId && body.bank && body.account_number) {
const { rows: [acc] } = await query(
`SELECT id FROM accounts WHERE bank = $1 AND account_number = $2`,
[body.bank, body.account_number]
);
accountId = acc?.id ?? null;
}
const bankRef = body.bank_ref ?? body.bankRef ?? null;
const id = newId();
try {
await query(
`INSERT INTO inbound_payments
(id, source, account_id, amount, sender_rut, description, received_at, bank_ref, raw)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
[
id,
body.source ?? 'bank_parser',
accountId,
amount,
body.sender_rut ?? body.senderRut ?? null,
body.description ?? null,
body.received_at ?? body.receivedAt ?? body.postedAt ?? new Date().toISOString(),
bankRef,
JSON.stringify(body),
]
);
} catch (err) {
if (err.code === '23505') return reply.send({ ok: true, skipped: 'duplicate' });
throw err;
}
const { rows: [inbound] } = await query(`SELECT * FROM inbound_payments WHERE id = $1`, [id]);
await runMatcher(inbound);
return reply.send({ ok: true, inbound_id: id });
});
// ─── Routes: payment intents ──────────────────────────────────────────────────
fastify.post('/intent', async (req, reply) => {
const { source_app, account_id, nominal_amount, sender_rut, callback_url, callback_secret, expires_in = 86400, metadata } = req.body ?? {};
if (!source_app) return reply.code(400).send({ error: 'source_app required' });
if (!account_id) return reply.code(400).send({ error: 'account_id required' });
if (!nominal_amount || nominal_amount <= 0) return reply.code(400).send({ error: 'nominal_amount required' });
if (!callback_url) return reply.code(400).send({ error: 'callback_url required' });
const { rows: [account] } = await query(`SELECT * FROM accounts WHERE id = $1 AND active = true`, [account_id]);
if (!account) return reply.code(404).send({ error: 'account not found' });
// Find lowest free slot: try nominal_amount-1, nominal_amount-2, ...
let exactAmount = null;
let jitter = 0;
for (let j = 0; j <= 50; j++) {
const candidate = nominal_amount - j;
if (candidate <= 0) break;
const { rows } = await query(
`SELECT 1 FROM payment_intents WHERE exact_amount = $1 AND account_id = $2 AND status = 'pending'`,
[candidate, account_id]
);
if (rows.length === 0) { exactAmount = candidate; jitter = j; break; }
}
if (exactAmount === null) return reply.code(409).send({ error: 'no available amount slot — too many concurrent intents at this amount' });
// Generate unique reference code
let refCode;
for (let i = 0; i < 10; i++) {
const candidate = genRefCode();
const { rows } = await query(`SELECT 1 FROM payment_intents WHERE reference_code = $1`, [candidate]);
if (rows.length === 0) { refCode = candidate; break; }
}
if (!refCode) return reply.code(500).send({ error: 'could not generate unique reference code' });
const id = newId();
const expiresAt = new Date(Date.now() + expires_in * 1000).toISOString();
await query(
`INSERT INTO payment_intents
(id, source_app, account_id, nominal_amount, exact_amount, jitter, sender_rut,
reference_code, callback_url, callback_secret, metadata, expires_at)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)`,
[id, source_app, account_id, nominal_amount, exactAmount, jitter, sender_rut ?? null,
refCode, callback_url, callback_secret ?? null, metadata ? JSON.stringify(metadata) : null, expiresAt]
);
return reply.code(201).send({
intent_id: id,
exact_amount: exactAmount,
jitter,
reference_code: refCode,
account: {
bank: account.bank,
account_number: account.account_number,
owner_name: account.owner_name,
owner_rut: account.owner_rut,
},
expires_at: expiresAt,
});
});
fastify.get('/intent/:id', async (req, reply) => {
const { rows: [intent] } = await query(`SELECT * FROM payment_intents WHERE id = $1`, [req.params.id]);
if (!intent) return reply.code(404).send({ error: 'not found' });
return reply.send(intent);
});
fastify.delete('/intent/:id', async (req, reply) => {
const { rows: [intent] } = await query(`SELECT id, status FROM payment_intents WHERE id = $1`, [req.params.id]);
if (!intent) return reply.code(404).send({ error: 'not found' });
if (intent.status === 'matched') return reply.code(409).send({ error: 'already matched' });
await query(`UPDATE payment_intents SET status = 'cancelled' WHERE id = $1`, [intent.id]);
return reply.send({ ok: true });
});
// ─── Routes: admin ────────────────────────────────────────────────────────────
// List unaccounted inbound payments
fastify.get('/admin/inbound', async (req) => {
const limit = Math.min(parseInt(req.query.limit ?? '50', 10), 200);
const { rows } = await query(
`SELECT * FROM inbound_payments WHERE status = 'unaccounted' ORDER BY detected_at DESC LIMIT $1`,
[limit]
);
return { unaccounted: rows, count: rows.length };
});
// Manually trigger a match sweep over all unaccounted inbounds
fastify.post('/admin/match', async (_req, reply) => {
const { rows } = await query(`SELECT * FROM inbound_payments WHERE status = 'unaccounted' ORDER BY detected_at ASC`);
let matched = 0;
for (const inbound of rows) {
const result = await matchInbound(inbound);
if (result) {
await confirmAndDispatch(result.intent, inbound, result.confidence);
matched++;
}
}
return reply.send({ swept: rows.length, matched });
});
// Expire stale intents
fastify.post('/admin/expire', async (_req, reply) => {
const { rowCount } = await query(
`UPDATE payment_intents SET status = 'expired' WHERE status = 'pending' AND expires_at < now()`
);
return reply.send({ expired: rowCount });
});
// List all accounts
fastify.get('/admin/accounts', async () => {
const { rows } = await query(`SELECT id, bank, account_number, owner_name, owner_rut, active, bci_subscription_id FROM accounts ORDER BY created_at`);
return { accounts: rows };
});
// Register a new account
fastify.post('/admin/accounts', async (req, reply) => {
const { bank, account_number, owner_name, owner_rut, check_digit, api_key, webhook_secret } = req.body ?? {};
if (!bank || !account_number || !owner_name || !owner_rut) {
return reply.code(400).send({ error: 'bank, account_number, owner_name, owner_rut required' });
}
const id = newId();
await query(
`INSERT INTO accounts (id, bank, account_number, owner_name, owner_rut, check_digit, api_key, webhook_secret)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
ON CONFLICT (bank, account_number) DO UPDATE
SET owner_name = EXCLUDED.owner_name,
owner_rut = EXCLUDED.owner_rut,
check_digit = EXCLUDED.check_digit,
api_key = EXCLUDED.api_key,
webhook_secret = EXCLUDED.webhook_secret`,
[id, bank, account_number, owner_name, owner_rut, check_digit ?? null, api_key ?? null, webhook_secret ?? null]
);
const { rows: [acc] } = await query(`SELECT * FROM accounts WHERE bank = $1 AND account_number = $2`, [bank, account_number]);
return reply.code(201).send(acc);
});
// Register BCI webhook subscription for an account
fastify.post('/admin/bci/subscribe', async (req, reply) => {
const { account_id } = req.body ?? {};
const { rows: [acc] } = await query(`SELECT * FROM accounts WHERE id = $1`, [account_id ?? '']);
if (!acc) return reply.code(404).send({ error: 'account not found' });
const callbackUrl = `${PUBLIC_URL}/bci/notify`;
const result = await createSubscription({
organizationName: acc.owner_name,
account: acc.account_number,
rut: acc.owner_rut,
checkDigit: acc.check_digit,
callbackUrl,
subscriptionKey: acc.api_key ?? BCI_CFG.subscriptionKey,
webhookSecret: acc.webhook_secret ?? BCI_CFG.webhookSecret,
});
await query(`UPDATE accounts SET bci_subscription_id = $1 WHERE id = $2`, [JSON.stringify(result), acc.id]);
return reply.send({ ok: true, subscription: result, callback_url: callbackUrl });
});
// Fire a test BCI notification (discovers real payload shape)
fastify.post('/admin/bci/simulate', async (req, reply) => {
const { account_id, amount = 1000 } = req.body ?? {};
const { rows: [acc] } = await query(`SELECT * FROM accounts WHERE id = $1`, [account_id ?? '']);
if (!acc) return reply.code(404).send({ error: 'account not found' });
const result = await simulateNotification({
callbackUrl: `${PUBLIC_URL}/bci/notify`,
amount,
subscriptionKey: acc.api_key ?? BCI_CFG.subscriptionKey,
webhookSecret: acc.webhook_secret ?? BCI_CFG.webhookSecret,
});
return reply.send({ ok: true, result });
});
// ─── Matcher helper ───────────────────────────────────────────────────────────
async function runMatcher(inbound) {
const result = await matchInbound(inbound);
if (result) {
fastify.log.info({ intent_id: result.intent.id, inbound_id: inbound.id, confidence: result.confidence }, 'match found');
await confirmAndDispatch(result.intent, inbound, result.confidence);
}
}
// ─── Periodic sweep ───────────────────────────────────────────────────────────
// Re-attempt unaccounted inbounds every 5 minutes in case a pending intent
// was registered after the inbound arrived (race condition).
function startSweepCron() {
setInterval(async () => {
try {
// Expire stale intents first
await query(`UPDATE payment_intents SET status = 'expired' WHERE status = 'pending' AND expires_at < now()`);
const { rows } = await query(`SELECT * FROM inbound_payments WHERE status = 'unaccounted' ORDER BY detected_at ASC LIMIT 100`);
for (const inbound of rows) {
const result = await matchInbound(inbound);
if (result) await confirmAndDispatch(result.intent, inbound, result.confidence);
}
} catch (err) {
fastify.log.error({ err }, 'sweep cron error');
}
}, 5 * 60 * 1000);
}
// ─── Start ────────────────────────────────────────────────────────────────────
try {
await migrate();
fastify.log.info('database migrated');
await fastify.listen({ port: PORT, host: '0.0.0.0' });
startSweepCron();
fastify.log.info(`kua-cashier listening on ${PORT}`);
} catch (err) {
fastify.log.error(err);
process.exit(1);
}