430 lines
18 KiB
JavaScript
430 lines
18 KiB
JavaScript
import Fastify from 'fastify';
|
|
import http from 'http';
|
|
import { migrate, query } from './db.js';
|
|
import { matchInbound } from './matcher.js';
|
|
import { confirmAndDispatch } from './dispatcher.js';
|
|
import { createSubscription, simulateNotification } from './bci.js';
|
|
import { newId, genRefCode, parsePesos } from './util.js';
|
|
|
|
const PORT = parseInt(process.env.PORT ?? '3500', 10);
|
|
const ADMIN_TOKEN = process.env.KUA_CASHIER_ADMIN_TOKEN ?? '';
|
|
const PUBLIC_URL = process.env.KUA_CASHIER_PUBLIC_URL ?? ''; // e.g. https://cashier.kua.cl
|
|
const ALLOWED_NODES = new Set((process.env.KUA_ALLOWED_NODES ?? 'gal,bruno,genesis').split(',').map(s => s.trim()));
|
|
const TAILSCALE_SOCKET = '/var/run/tailscale/tailscaled.sock';
|
|
const DEV = process.env.NODE_ENV !== 'production';
|
|
|
|
// ─── BCI config (from env / kua-vault) ──────────────────────────────────────
|
|
|
|
const BCI_CFG = {
|
|
subscriptionKey: process.env.BCI_SUBSCRIPTION_KEY ?? '',
|
|
webhookSecret: process.env.BCI_WEBHOOK_SECRET ?? '',
|
|
organizationName: process.env.BCI_ORGANIZATION_NAME ?? '',
|
|
accountNumber: process.env.BCI_ACCOUNT_NUMBER ?? '',
|
|
rut: process.env.BCI_ACCOUNT_RUT ?? '',
|
|
checkDigit: process.env.BCI_ACCOUNT_CHECK_DIGIT ?? '',
|
|
};
|
|
|
|
// ─── Fastify ─────────────────────────────────────────────────────────────────
|
|
|
|
const fastify = Fastify({ logger: true });
|
|
|
|
// ─── Tailscale whois ─────────────────────────────────────────────────────────
|
|
|
|
async function tailscaleWhois(remoteAddr) {
|
|
return new Promise((resolve) => {
|
|
const timeout = setTimeout(() => resolve(null), 2000);
|
|
const req = http.request(
|
|
{ socketPath: TAILSCALE_SOCKET, path: `/localapi/v0/whois?addr=${encodeURIComponent(remoteAddr)}`, method: 'GET' },
|
|
(res) => {
|
|
let data = '';
|
|
res.on('data', c => { data += c; });
|
|
res.on('end', () => {
|
|
clearTimeout(timeout);
|
|
try {
|
|
const p = JSON.parse(data);
|
|
resolve(p.Node ? {
|
|
hostname: p.Node.ComputedName ?? '',
|
|
tags: p.Node.Tags ?? [],
|
|
user: p.UserProfile?.LoginName ?? '',
|
|
} : null);
|
|
} catch { resolve(null); }
|
|
});
|
|
}
|
|
);
|
|
req.on('error', () => { clearTimeout(timeout); resolve(null); });
|
|
req.end();
|
|
});
|
|
}
|
|
|
|
function isAllowedNode(id) {
|
|
if (!id) return false;
|
|
if (id.tags?.includes('tag:admin')) return true;
|
|
return ALLOWED_NODES.has(id.hostname);
|
|
}
|
|
|
|
// ─── Auth hook ───────────────────────────────────────────────────────────────
|
|
// Public paths: /health, /bci/notify (has its own APIKey check)
|
|
// Everything else: Tailscale whois OR Bearer token
|
|
|
|
fastify.addHook('onRequest', async (req, reply) => {
|
|
const { url } = req;
|
|
if (url === '/health' || url.startsWith('/bci/notify')) return;
|
|
|
|
// Bearer token
|
|
const auth = req.headers.authorization;
|
|
if (auth?.startsWith('Bearer ') && ADMIN_TOKEN && auth.slice(7) === ADMIN_TOKEN) return;
|
|
|
|
// Tailscale whois
|
|
const isLocal = ['127.0.0.1', '::1', '::ffff:127.0.0.1'].includes(req.ip) || req.ip.startsWith('172.');
|
|
if (isLocal) { req.identity = { hostname: 'bruno', tags: ['tag:admin'] }; return; }
|
|
|
|
if (!DEV) {
|
|
const id = await tailscaleWhois(`${req.ip}:${req.socket.remotePort ?? 0}`);
|
|
if (!id || !isAllowedNode(id)) {
|
|
return reply.code(403).send({ error: 'not authorized' });
|
|
}
|
|
req.identity = id;
|
|
}
|
|
});
|
|
|
|
// ─── Routes: health ───────────────────────────────────────────────────────────
|
|
|
|
fastify.get('/health', async () => ({ ok: true, service: 'kua-cashier' }));
|
|
|
|
// ─── Routes: BCI webhook ──────────────────────────────────────────────────────
|
|
// BCI payload fields (discovered from sandbox):
|
|
// idEvento, idContable, monto, fechaTransaccion, fechaNotificacion,
|
|
// rutCliente, dvCliente, tipoTransaccion, anulacion, numeroReintento,
|
|
// concepto.{ nombre, rut, dv, numeroCuenta, codigoBanco, nroOperacion, mensaje }
|
|
//
|
|
// Auth: secret embedded as ?token=... in the callback URL we register with BCI.
|
|
// BCI does NOT echo APIKey back in the webhook body.
|
|
|
|
fastify.post('/bci/notify', async (req, reply) => {
|
|
const raw = req.body ?? {};
|
|
fastify.log.info({ raw }, 'bci webhook received');
|
|
|
|
// Verify token embedded in callback URL query string
|
|
const token = req.query?.token ?? '';
|
|
if (BCI_CFG.webhookSecret && token !== BCI_CFG.webhookSecret) {
|
|
fastify.log.warn('bci webhook: token mismatch');
|
|
return reply.code(401).send({ error: 'invalid token' });
|
|
}
|
|
|
|
// Skip reversals
|
|
if (raw.anulacion === true) {
|
|
return reply.send({ ok: true, skipped: 'anulacion' });
|
|
}
|
|
|
|
// monto is an integer in BCI's payload
|
|
const amount = typeof raw.monto === 'number' ? raw.monto : parsePesos(raw.monto);
|
|
if (!amount || amount <= 0) {
|
|
return reply.code(400).send({ error: 'missing or invalid monto' });
|
|
}
|
|
|
|
// Sender RUT: rutCliente + dvCliente → "88777666-0"
|
|
const senderRut = raw.rutCliente
|
|
? `${raw.rutCliente}-${raw.dvCliente ?? ''}`.replace(/-$/, '')
|
|
: null;
|
|
|
|
// Reference code lives in concepto.mensaje (customer fills in transfer description)
|
|
const description = raw.concepto?.mensaje ?? null;
|
|
|
|
const bankRef = raw.idEvento ?? null;
|
|
const receivedAt = raw.fechaTransaccion ?? raw.fechaNotificacion ?? new Date().toISOString();
|
|
|
|
const { rows: [account] } = await query(
|
|
`SELECT id FROM accounts WHERE bank = 'bci' AND active = true LIMIT 1`
|
|
);
|
|
|
|
const id = newId();
|
|
try {
|
|
await query(
|
|
`INSERT INTO inbound_payments
|
|
(id, source, account_id, amount, sender_rut, description, received_at, bank_ref, raw)
|
|
VALUES ($1, 'bci_webhook', $2, $3, $4, $5, $6, $7, $8)`,
|
|
[id, account?.id ?? null, amount, senderRut, description, receivedAt, bankRef, JSON.stringify(raw)]
|
|
);
|
|
} catch (err) {
|
|
if (err.code === '23505') {
|
|
fastify.log.info({ bankRef }, 'bci webhook: duplicate, skipping');
|
|
return reply.send({ ok: true, skipped: 'duplicate' });
|
|
}
|
|
throw err;
|
|
}
|
|
|
|
const { rows: [inbound] } = await query(`SELECT * FROM inbound_payments WHERE id = $1`, [id]);
|
|
await runMatcher(inbound);
|
|
|
|
// BCI expects this specific response shape (S003 sandbox warning goes away with it)
|
|
return reply.send({ Data: { Status: 'RECIBIDO' }, Links: {}, Meta: {} });
|
|
});
|
|
|
|
// ─── Routes: generic inbound (bank-parser, email, etc.) ───────────────────────
|
|
|
|
fastify.post('/inbound/notify', async (req, reply) => {
|
|
const body = req.body ?? {};
|
|
|
|
const amount = parsePesos(body.amount ?? body.amountMinor);
|
|
if (!amount || amount <= 0) return reply.code(400).send({ error: 'amount required' });
|
|
|
|
// Resolve account_id from bank + account_number if provided
|
|
let accountId = body.account_id ?? null;
|
|
if (!accountId && body.bank && body.account_number) {
|
|
const { rows: [acc] } = await query(
|
|
`SELECT id FROM accounts WHERE bank = $1 AND account_number = $2`,
|
|
[body.bank, body.account_number]
|
|
);
|
|
accountId = acc?.id ?? null;
|
|
}
|
|
|
|
const bankRef = body.bank_ref ?? body.bankRef ?? null;
|
|
const id = newId();
|
|
|
|
try {
|
|
await query(
|
|
`INSERT INTO inbound_payments
|
|
(id, source, account_id, amount, sender_rut, description, received_at, bank_ref, raw)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
|
|
[
|
|
id,
|
|
body.source ?? 'bank_parser',
|
|
accountId,
|
|
amount,
|
|
body.sender_rut ?? body.senderRut ?? null,
|
|
body.description ?? null,
|
|
body.received_at ?? body.receivedAt ?? body.postedAt ?? new Date().toISOString(),
|
|
bankRef,
|
|
JSON.stringify(body),
|
|
]
|
|
);
|
|
} catch (err) {
|
|
if (err.code === '23505') return reply.send({ ok: true, skipped: 'duplicate' });
|
|
throw err;
|
|
}
|
|
|
|
const { rows: [inbound] } = await query(`SELECT * FROM inbound_payments WHERE id = $1`, [id]);
|
|
await runMatcher(inbound);
|
|
|
|
return reply.send({ ok: true, inbound_id: id });
|
|
});
|
|
|
|
// ─── Routes: payment intents ──────────────────────────────────────────────────
|
|
|
|
fastify.post('/intent', async (req, reply) => {
|
|
const { source_app, account_id, nominal_amount, sender_rut, callback_url, callback_secret, expires_in = 86400, metadata } = req.body ?? {};
|
|
|
|
if (!source_app) return reply.code(400).send({ error: 'source_app required' });
|
|
if (!account_id) return reply.code(400).send({ error: 'account_id required' });
|
|
if (!nominal_amount || nominal_amount <= 0) return reply.code(400).send({ error: 'nominal_amount required' });
|
|
if (!callback_url) return reply.code(400).send({ error: 'callback_url required' });
|
|
|
|
const { rows: [account] } = await query(`SELECT * FROM accounts WHERE id = $1 AND active = true`, [account_id]);
|
|
if (!account) return reply.code(404).send({ error: 'account not found' });
|
|
|
|
// Find lowest free slot: try nominal_amount-1, nominal_amount-2, ...
|
|
let exactAmount = null;
|
|
let jitter = 0;
|
|
for (let j = 0; j <= 50; j++) {
|
|
const candidate = nominal_amount - j;
|
|
if (candidate <= 0) break;
|
|
const { rows } = await query(
|
|
`SELECT 1 FROM payment_intents WHERE exact_amount = $1 AND account_id = $2 AND status = 'pending'`,
|
|
[candidate, account_id]
|
|
);
|
|
if (rows.length === 0) { exactAmount = candidate; jitter = j; break; }
|
|
}
|
|
if (exactAmount === null) return reply.code(409).send({ error: 'no available amount slot — too many concurrent intents at this amount' });
|
|
|
|
// Generate unique reference code
|
|
let refCode;
|
|
for (let i = 0; i < 10; i++) {
|
|
const candidate = genRefCode();
|
|
const { rows } = await query(`SELECT 1 FROM payment_intents WHERE reference_code = $1`, [candidate]);
|
|
if (rows.length === 0) { refCode = candidate; break; }
|
|
}
|
|
if (!refCode) return reply.code(500).send({ error: 'could not generate unique reference code' });
|
|
|
|
const id = newId();
|
|
const expiresAt = new Date(Date.now() + expires_in * 1000).toISOString();
|
|
|
|
await query(
|
|
`INSERT INTO payment_intents
|
|
(id, source_app, account_id, nominal_amount, exact_amount, jitter, sender_rut,
|
|
reference_code, callback_url, callback_secret, metadata, expires_at)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)`,
|
|
[id, source_app, account_id, nominal_amount, exactAmount, jitter, sender_rut ?? null,
|
|
refCode, callback_url, callback_secret ?? null, metadata ? JSON.stringify(metadata) : null, expiresAt]
|
|
);
|
|
|
|
return reply.code(201).send({
|
|
intent_id: id,
|
|
exact_amount: exactAmount,
|
|
jitter,
|
|
reference_code: refCode,
|
|
account: {
|
|
bank: account.bank,
|
|
account_number: account.account_number,
|
|
owner_name: account.owner_name,
|
|
owner_rut: account.owner_rut,
|
|
},
|
|
expires_at: expiresAt,
|
|
});
|
|
});
|
|
|
|
fastify.get('/intent/:id', async (req, reply) => {
|
|
const { rows: [intent] } = await query(`SELECT * FROM payment_intents WHERE id = $1`, [req.params.id]);
|
|
if (!intent) return reply.code(404).send({ error: 'not found' });
|
|
return reply.send(intent);
|
|
});
|
|
|
|
fastify.delete('/intent/:id', async (req, reply) => {
|
|
const { rows: [intent] } = await query(`SELECT id, status FROM payment_intents WHERE id = $1`, [req.params.id]);
|
|
if (!intent) return reply.code(404).send({ error: 'not found' });
|
|
if (intent.status === 'matched') return reply.code(409).send({ error: 'already matched' });
|
|
await query(`UPDATE payment_intents SET status = 'cancelled' WHERE id = $1`, [intent.id]);
|
|
return reply.send({ ok: true });
|
|
});
|
|
|
|
// ─── Routes: admin ────────────────────────────────────────────────────────────
|
|
|
|
// List unaccounted inbound payments
|
|
fastify.get('/admin/inbound', async (req) => {
|
|
const limit = Math.min(parseInt(req.query.limit ?? '50', 10), 200);
|
|
const { rows } = await query(
|
|
`SELECT * FROM inbound_payments WHERE status = 'unaccounted' ORDER BY detected_at DESC LIMIT $1`,
|
|
[limit]
|
|
);
|
|
return { unaccounted: rows, count: rows.length };
|
|
});
|
|
|
|
// Manually trigger a match sweep over all unaccounted inbounds
|
|
fastify.post('/admin/match', async (_req, reply) => {
|
|
const { rows } = await query(`SELECT * FROM inbound_payments WHERE status = 'unaccounted' ORDER BY detected_at ASC`);
|
|
let matched = 0;
|
|
for (const inbound of rows) {
|
|
const result = await matchInbound(inbound);
|
|
if (result) {
|
|
await confirmAndDispatch(result.intent, inbound, result.confidence);
|
|
matched++;
|
|
}
|
|
}
|
|
return reply.send({ swept: rows.length, matched });
|
|
});
|
|
|
|
// Expire stale intents
|
|
fastify.post('/admin/expire', async (_req, reply) => {
|
|
const { rowCount } = await query(
|
|
`UPDATE payment_intents SET status = 'expired' WHERE status = 'pending' AND expires_at < now()`
|
|
);
|
|
return reply.send({ expired: rowCount });
|
|
});
|
|
|
|
// List all accounts
|
|
fastify.get('/admin/accounts', async () => {
|
|
const { rows } = await query(`SELECT id, bank, account_number, owner_name, owner_rut, active, bci_subscription_id FROM accounts ORDER BY created_at`);
|
|
return { accounts: rows };
|
|
});
|
|
|
|
// Register a new account
|
|
fastify.post('/admin/accounts', async (req, reply) => {
|
|
const { bank, account_number, owner_name, owner_rut, check_digit, api_key, webhook_secret } = req.body ?? {};
|
|
if (!bank || !account_number || !owner_name || !owner_rut) {
|
|
return reply.code(400).send({ error: 'bank, account_number, owner_name, owner_rut required' });
|
|
}
|
|
const id = newId();
|
|
await query(
|
|
`INSERT INTO accounts (id, bank, account_number, owner_name, owner_rut, check_digit, api_key, webhook_secret)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
|
|
ON CONFLICT (bank, account_number) DO UPDATE
|
|
SET owner_name = EXCLUDED.owner_name,
|
|
owner_rut = EXCLUDED.owner_rut,
|
|
check_digit = EXCLUDED.check_digit,
|
|
api_key = EXCLUDED.api_key,
|
|
webhook_secret = EXCLUDED.webhook_secret`,
|
|
[id, bank, account_number, owner_name, owner_rut, check_digit ?? null, api_key ?? null, webhook_secret ?? null]
|
|
);
|
|
const { rows: [acc] } = await query(`SELECT * FROM accounts WHERE bank = $1 AND account_number = $2`, [bank, account_number]);
|
|
return reply.code(201).send(acc);
|
|
});
|
|
|
|
// Register BCI webhook subscription for an account
|
|
fastify.post('/admin/bci/subscribe', async (req, reply) => {
|
|
const { account_id } = req.body ?? {};
|
|
const { rows: [acc] } = await query(`SELECT * FROM accounts WHERE id = $1`, [account_id ?? '']);
|
|
if (!acc) return reply.code(404).send({ error: 'account not found' });
|
|
|
|
const callbackUrl = `${PUBLIC_URL}/bci/notify`;
|
|
const result = await createSubscription({
|
|
organizationName: acc.owner_name,
|
|
account: acc.account_number,
|
|
rut: acc.owner_rut,
|
|
checkDigit: acc.check_digit,
|
|
callbackUrl,
|
|
subscriptionKey: acc.api_key ?? BCI_CFG.subscriptionKey,
|
|
webhookSecret: acc.webhook_secret ?? BCI_CFG.webhookSecret,
|
|
});
|
|
|
|
await query(`UPDATE accounts SET bci_subscription_id = $1 WHERE id = $2`, [JSON.stringify(result), acc.id]);
|
|
return reply.send({ ok: true, subscription: result, callback_url: callbackUrl });
|
|
});
|
|
|
|
// Fire a test BCI notification (discovers real payload shape)
|
|
fastify.post('/admin/bci/simulate', async (req, reply) => {
|
|
const { account_id, amount = 1000 } = req.body ?? {};
|
|
const { rows: [acc] } = await query(`SELECT * FROM accounts WHERE id = $1`, [account_id ?? '']);
|
|
if (!acc) return reply.code(404).send({ error: 'account not found' });
|
|
|
|
const result = await simulateNotification({
|
|
callbackUrl: `${PUBLIC_URL}/bci/notify`,
|
|
amount,
|
|
subscriptionKey: acc.api_key ?? BCI_CFG.subscriptionKey,
|
|
webhookSecret: acc.webhook_secret ?? BCI_CFG.webhookSecret,
|
|
});
|
|
return reply.send({ ok: true, result });
|
|
});
|
|
|
|
// ─── Matcher helper ───────────────────────────────────────────────────────────
|
|
|
|
async function runMatcher(inbound) {
|
|
const result = await matchInbound(inbound);
|
|
if (result) {
|
|
fastify.log.info({ intent_id: result.intent.id, inbound_id: inbound.id, confidence: result.confidence }, 'match found');
|
|
await confirmAndDispatch(result.intent, inbound, result.confidence);
|
|
}
|
|
}
|
|
|
|
// ─── Periodic sweep ───────────────────────────────────────────────────────────
|
|
// Re-attempt unaccounted inbounds every 5 minutes in case a pending intent
|
|
// was registered after the inbound arrived (race condition).
|
|
|
|
function startSweepCron() {
|
|
setInterval(async () => {
|
|
try {
|
|
// Expire stale intents first
|
|
await query(`UPDATE payment_intents SET status = 'expired' WHERE status = 'pending' AND expires_at < now()`);
|
|
|
|
const { rows } = await query(`SELECT * FROM inbound_payments WHERE status = 'unaccounted' ORDER BY detected_at ASC LIMIT 100`);
|
|
for (const inbound of rows) {
|
|
const result = await matchInbound(inbound);
|
|
if (result) await confirmAndDispatch(result.intent, inbound, result.confidence);
|
|
}
|
|
} catch (err) {
|
|
fastify.log.error({ err }, 'sweep cron error');
|
|
}
|
|
}, 5 * 60 * 1000);
|
|
}
|
|
|
|
// ─── Start ────────────────────────────────────────────────────────────────────
|
|
|
|
try {
|
|
await migrate();
|
|
fastify.log.info('database migrated');
|
|
await fastify.listen({ port: PORT, host: '0.0.0.0' });
|
|
startSweepCron();
|
|
fastify.log.info(`kua-cashier listening on ${PORT}`);
|
|
} catch (err) {
|
|
fastify.log.error(err);
|
|
process.exit(1);
|
|
}
|