mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-05 18:42:19 +00:00
0c5d1d64a3
* sentry client: log connected peer info * observer: unseen sentry peers report * observer: refactoring node.go to node_utils * observer: sentry candidates intake
928 lines
23 KiB
Go
928 lines
23 KiB
Go
package database
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
_ "modernc.org/sqlite"
|
|
"net"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
type DBSQLite struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
// language=SQL
|
|
const (
|
|
sqlCreateSchema = `
|
|
PRAGMA journal_mode = WAL;
|
|
|
|
CREATE TABLE IF NOT EXISTS nodes (
|
|
id TEXT PRIMARY KEY,
|
|
|
|
ip TEXT,
|
|
port_disc INTEGER,
|
|
port_rlpx INTEGER,
|
|
ip_v6 TEXT,
|
|
ip_v6_port_disc INTEGER,
|
|
ip_v6_port_rlpx INTEGER,
|
|
addr_updated INTEGER NOT NULL,
|
|
|
|
ping_try INTEGER NOT NULL DEFAULT 0,
|
|
|
|
compat_fork INTEGER,
|
|
compat_fork_updated INTEGER,
|
|
|
|
client_id TEXT,
|
|
network_id INTEGER,
|
|
eth_version INTEGER,
|
|
handshake_transient_err INTEGER NOT NULL DEFAULT 0,
|
|
handshake_updated INTEGER,
|
|
handshake_retry_time INTEGER,
|
|
|
|
neighbor_keys TEXT,
|
|
|
|
crawl_retry_time INTEGER
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS handshake_errors (
|
|
id TEXT NOT NULL,
|
|
err TEXT NOT NULL,
|
|
updated INTEGER NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS sentry_candidates_intake (
|
|
id INTEGER PRIMARY KEY,
|
|
last_event_time INTEGER NOT NULL
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_nodes_crawl_retry_time ON nodes (crawl_retry_time);
|
|
CREATE INDEX IF NOT EXISTS idx_nodes_ip ON nodes (ip);
|
|
CREATE INDEX IF NOT EXISTS idx_nodes_ip_v6 ON nodes (ip_v6);
|
|
CREATE INDEX IF NOT EXISTS idx_nodes_ping_try ON nodes (ping_try);
|
|
CREATE INDEX IF NOT EXISTS idx_nodes_compat_fork ON nodes (compat_fork);
|
|
CREATE INDEX IF NOT EXISTS idx_nodes_network_id ON nodes (network_id);
|
|
CREATE INDEX IF NOT EXISTS idx_nodes_handshake_retry_time ON nodes (handshake_retry_time);
|
|
CREATE INDEX IF NOT EXISTS idx_handshake_errors_id ON handshake_errors (id);
|
|
`
|
|
|
|
sqlUpsertNodeAddr = `
|
|
INSERT INTO nodes(
|
|
id,
|
|
ip,
|
|
port_disc,
|
|
port_rlpx,
|
|
ip_v6,
|
|
ip_v6_port_disc,
|
|
ip_v6_port_rlpx,
|
|
addr_updated
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
ip = excluded.ip,
|
|
port_disc = excluded.port_disc,
|
|
port_rlpx = excluded.port_rlpx,
|
|
ip_v6 = excluded.ip_v6,
|
|
ip_v6_port_disc = excluded.ip_v6_port_disc,
|
|
ip_v6_port_rlpx = excluded.ip_v6_port_rlpx,
|
|
addr_updated = excluded.addr_updated
|
|
`
|
|
|
|
sqlFindNodeAddr = `
|
|
SELECT
|
|
ip,
|
|
port_disc,
|
|
port_rlpx,
|
|
ip_v6,
|
|
ip_v6_port_disc,
|
|
ip_v6_port_rlpx
|
|
FROM nodes
|
|
WHERE id = ?
|
|
`
|
|
|
|
sqlResetPingError = `
|
|
UPDATE nodes SET ping_try = 0 WHERE id = ?
|
|
`
|
|
|
|
sqlUpdatePingError = `
|
|
UPDATE nodes SET ping_try = nodes.ping_try + 1 WHERE id = ?
|
|
`
|
|
|
|
sqlCountPingErrors = `
|
|
SELECT ping_try FROM nodes WHERE id = ?
|
|
`
|
|
|
|
sqlUpdateClientID = `
|
|
UPDATE nodes SET
|
|
client_id = ?,
|
|
handshake_updated = ?
|
|
WHERE id = ?
|
|
`
|
|
|
|
sqlFindClientID = `
|
|
SELECT client_id FROM nodes WHERE id = ?
|
|
`
|
|
|
|
sqlUpdateNetworkID = `
|
|
UPDATE nodes SET
|
|
network_id = ?,
|
|
handshake_updated = ?
|
|
WHERE id = ?
|
|
`
|
|
|
|
sqlUpdateEthVersion = `
|
|
UPDATE nodes SET
|
|
eth_version = ?,
|
|
handshake_updated = ?
|
|
WHERE id = ?
|
|
`
|
|
|
|
sqlUpdateHandshakeTransientError = `
|
|
UPDATE nodes SET
|
|
handshake_transient_err = ?,
|
|
handshake_updated = ?
|
|
WHERE id = ?
|
|
`
|
|
|
|
sqlInsertHandshakeError = `
|
|
INSERT INTO handshake_errors(
|
|
id,
|
|
err,
|
|
updated
|
|
) VALUES (?, ?, ?)
|
|
`
|
|
|
|
sqlDeleteHandshakeErrors = `
|
|
DELETE FROM handshake_errors WHERE id = ?
|
|
`
|
|
|
|
sqlFindHandshakeLastErrors = `
|
|
SELECT err, updated FROM handshake_errors
|
|
WHERE id = ?
|
|
ORDER BY updated DESC
|
|
LIMIT ?
|
|
`
|
|
|
|
sqlUpdateHandshakeRetryTime = `
|
|
UPDATE nodes SET handshake_retry_time = ? WHERE id = ?
|
|
`
|
|
|
|
sqlFindHandshakeRetryTime = `
|
|
SELECT handshake_retry_time FROM nodes WHERE id = ?
|
|
`
|
|
|
|
sqlCountHandshakeCandidates = `
|
|
SELECT COUNT(*) FROM nodes
|
|
WHERE ((handshake_retry_time IS NULL) OR (handshake_retry_time < ?))
|
|
AND ((compat_fork == TRUE) OR (compat_fork IS NULL))
|
|
`
|
|
|
|
sqlFindHandshakeCandidates = `
|
|
SELECT id FROM nodes
|
|
WHERE ((handshake_retry_time IS NULL) OR (handshake_retry_time < ?))
|
|
AND ((compat_fork == TRUE) OR (compat_fork IS NULL))
|
|
ORDER BY handshake_retry_time
|
|
LIMIT ?
|
|
`
|
|
|
|
sqlMarkTakenHandshakeCandidates = `
|
|
UPDATE nodes SET handshake_retry_time = ? WHERE id IN (123)
|
|
`
|
|
|
|
sqlUpdateForkCompatibility = `
|
|
UPDATE nodes SET compat_fork = ?, compat_fork_updated = ? WHERE id = ?
|
|
`
|
|
|
|
sqlUpdateNeighborBucketKeys = `
|
|
UPDATE nodes SET neighbor_keys = ? WHERE id = ?
|
|
`
|
|
|
|
sqlFindNeighborBucketKeys = `
|
|
SELECT neighbor_keys FROM nodes WHERE id = ?
|
|
`
|
|
|
|
sqlUpdateSentryCandidatesLastEventTime = `
|
|
INSERT INTO sentry_candidates_intake(
|
|
id,
|
|
last_event_time
|
|
) VALUES (0, ?)
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
last_event_time = excluded.last_event_time
|
|
`
|
|
|
|
sqlFindSentryCandidatesLastEventTime = `
|
|
SELECT last_event_time FROM sentry_candidates_intake WHERE id = 0
|
|
`
|
|
|
|
sqlUpdateCrawlRetryTime = `
|
|
UPDATE nodes SET crawl_retry_time = ? WHERE id = ?
|
|
`
|
|
|
|
sqlCountCandidates = `
|
|
SELECT COUNT(*) FROM nodes
|
|
WHERE ((crawl_retry_time IS NULL) OR (crawl_retry_time < ?))
|
|
AND ((compat_fork == TRUE) OR (compat_fork IS NULL))
|
|
`
|
|
|
|
sqlFindCandidates = `
|
|
SELECT id FROM nodes
|
|
WHERE ((crawl_retry_time IS NULL) OR (crawl_retry_time < ?))
|
|
AND ((compat_fork == TRUE) OR (compat_fork IS NULL))
|
|
ORDER BY crawl_retry_time
|
|
LIMIT ?
|
|
`
|
|
|
|
sqlMarkTakenNodes = `
|
|
UPDATE nodes SET crawl_retry_time = ? WHERE id IN (123)
|
|
`
|
|
|
|
sqlCountNodes = `
|
|
SELECT COUNT(*) FROM nodes
|
|
WHERE (ping_try < ?)
|
|
AND ((network_id = ?) OR (network_id IS NULL))
|
|
AND ((compat_fork == TRUE) OR (compat_fork IS NULL))
|
|
`
|
|
|
|
sqlCountIPs = `
|
|
SELECT COUNT(DISTINCT ip) FROM nodes
|
|
WHERE (ping_try < ?)
|
|
AND ((network_id = ?) OR (network_id IS NULL))
|
|
AND ((compat_fork == TRUE) OR (compat_fork IS NULL))
|
|
`
|
|
|
|
sqlCountClients = `
|
|
SELECT COUNT(*) FROM nodes
|
|
WHERE (ping_try < ?)
|
|
AND (network_id = ?)
|
|
AND ((compat_fork == TRUE) OR (compat_fork IS NULL))
|
|
AND (client_id LIKE ?)
|
|
`
|
|
|
|
sqlCountClientsWithNetworkID = `
|
|
SELECT COUNT(*) FROM nodes
|
|
WHERE (ping_try < ?)
|
|
AND (network_id IS NOT NULL)
|
|
AND ((compat_fork == TRUE) OR (compat_fork IS NULL))
|
|
AND (client_id LIKE ?)
|
|
`
|
|
|
|
sqlCountClientsWithHandshakeTransientError = `
|
|
SELECT COUNT(*) FROM nodes
|
|
WHERE (ping_try < ?)
|
|
AND (handshake_transient_err = 1)
|
|
AND (network_id IS NULL)
|
|
AND ((compat_fork == TRUE) OR (compat_fork IS NULL))
|
|
AND (client_id LIKE ?)
|
|
`
|
|
|
|
sqlEnumerateClientIDs = `
|
|
SELECT client_id FROM nodes
|
|
WHERE (ping_try < ?)
|
|
AND ((network_id = ?) OR (network_id IS NULL))
|
|
AND ((compat_fork == TRUE) OR (compat_fork IS NULL))
|
|
`
|
|
)
|
|
|
|
func NewDBSQLite(filePath string) (*DBSQLite, error) {
|
|
db, err := sql.Open("sqlite", filePath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open DB: %w", err)
|
|
}
|
|
|
|
_, err = db.Exec(sqlCreateSchema)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create the DB schema: %w", err)
|
|
}
|
|
|
|
instance := DBSQLite{db}
|
|
return &instance, nil
|
|
}
|
|
|
|
func (db *DBSQLite) Close() error {
|
|
return db.db.Close()
|
|
}
|
|
|
|
func (db *DBSQLite) UpsertNodeAddr(ctx context.Context, id NodeID, addr NodeAddr) error {
|
|
var ip *string
|
|
if addr.IP != nil {
|
|
value := addr.IP.String()
|
|
ip = &value
|
|
}
|
|
|
|
var ipV6 *string
|
|
if addr.IPv6.IP != nil {
|
|
value := addr.IPv6.IP.String()
|
|
ipV6 = &value
|
|
}
|
|
|
|
var portDisc *int
|
|
if (ip != nil) && (addr.PortDisc != 0) {
|
|
value := int(addr.PortDisc)
|
|
portDisc = &value
|
|
}
|
|
|
|
var ipV6PortDisc *int
|
|
if (ipV6 != nil) && (addr.IPv6.PortDisc != 0) {
|
|
value := int(addr.IPv6.PortDisc)
|
|
ipV6PortDisc = &value
|
|
}
|
|
|
|
var portRLPx *int
|
|
if (ip != nil) && (addr.PortRLPx != 0) {
|
|
value := int(addr.PortRLPx)
|
|
portRLPx = &value
|
|
}
|
|
|
|
var ipV6PortRLPx *int
|
|
if (ipV6 != nil) && (addr.IPv6.PortRLPx != 0) {
|
|
value := int(addr.IPv6.PortRLPx)
|
|
ipV6PortRLPx = &value
|
|
}
|
|
|
|
updated := time.Now().Unix()
|
|
|
|
_, err := db.db.ExecContext(ctx, sqlUpsertNodeAddr,
|
|
id,
|
|
ip, portDisc, portRLPx,
|
|
ipV6, ipV6PortDisc, ipV6PortRLPx,
|
|
updated)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to upsert a node address: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DBSQLite) FindNodeAddr(ctx context.Context, id NodeID) (*NodeAddr, error) {
|
|
row := db.db.QueryRowContext(ctx, sqlFindNodeAddr, id)
|
|
|
|
var ip sql.NullString
|
|
var portDisc sql.NullInt32
|
|
var portRLPx sql.NullInt32
|
|
var ipV6 sql.NullString
|
|
var ipV6PortDisc sql.NullInt32
|
|
var ipV6PortRLPx sql.NullInt32
|
|
|
|
err := row.Scan(
|
|
&ip,
|
|
&portDisc,
|
|
&portRLPx,
|
|
&ipV6,
|
|
&ipV6PortDisc,
|
|
&ipV6PortRLPx)
|
|
if err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
return nil, fmt.Errorf("FindNodeAddr failed: %w", err)
|
|
}
|
|
|
|
var addr NodeAddr
|
|
|
|
if ip.Valid {
|
|
value := net.ParseIP(ip.String)
|
|
if value == nil {
|
|
return nil, errors.New("FindNodeAddr failed to parse IP")
|
|
}
|
|
addr.IP = value
|
|
}
|
|
if ipV6.Valid {
|
|
value := net.ParseIP(ipV6.String)
|
|
if value == nil {
|
|
return nil, errors.New("FindNodeAddr failed to parse IPv6")
|
|
}
|
|
addr.IPv6.IP = value
|
|
}
|
|
if portDisc.Valid {
|
|
value := uint16(portDisc.Int32)
|
|
addr.PortDisc = value
|
|
}
|
|
if portRLPx.Valid {
|
|
value := uint16(portRLPx.Int32)
|
|
addr.PortRLPx = value
|
|
}
|
|
if ipV6PortDisc.Valid {
|
|
value := uint16(ipV6PortDisc.Int32)
|
|
addr.IPv6.PortDisc = value
|
|
}
|
|
if ipV6PortRLPx.Valid {
|
|
value := uint16(ipV6PortRLPx.Int32)
|
|
addr.IPv6.PortRLPx = value
|
|
}
|
|
|
|
return &addr, nil
|
|
}
|
|
|
|
func (db *DBSQLite) ResetPingError(ctx context.Context, id NodeID) error {
|
|
_, err := db.db.ExecContext(ctx, sqlResetPingError, id)
|
|
if err != nil {
|
|
return fmt.Errorf("ResetPingError failed: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DBSQLite) UpdatePingError(ctx context.Context, id NodeID) error {
|
|
_, err := db.db.ExecContext(ctx, sqlUpdatePingError, id)
|
|
if err != nil {
|
|
return fmt.Errorf("UpdatePingError failed: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DBSQLite) CountPingErrors(ctx context.Context, id NodeID) (*uint, error) {
|
|
row := db.db.QueryRowContext(ctx, sqlCountPingErrors, id)
|
|
var count uint
|
|
if err := row.Scan(&count); err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
return nil, fmt.Errorf("CountPingErrors failed: %w", err)
|
|
}
|
|
return &count, nil
|
|
}
|
|
|
|
func (db *DBSQLite) UpdateClientID(ctx context.Context, id NodeID, clientID string) error {
|
|
updated := time.Now().Unix()
|
|
|
|
_, err := db.db.ExecContext(ctx, sqlUpdateClientID, clientID, updated, id)
|
|
if err != nil {
|
|
return fmt.Errorf("UpdateClientID failed to update a node: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DBSQLite) FindClientID(ctx context.Context, id NodeID) (*string, error) {
|
|
row := db.db.QueryRowContext(ctx, sqlFindClientID, id)
|
|
var clientID sql.NullString
|
|
err := row.Scan(&clientID)
|
|
if err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
return nil, fmt.Errorf("FindClientID failed: %w", err)
|
|
}
|
|
if clientID.Valid {
|
|
return &clientID.String, nil
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func (db *DBSQLite) UpdateNetworkID(ctx context.Context, id NodeID, networkID uint) error {
|
|
updated := time.Now().Unix()
|
|
|
|
_, err := db.db.ExecContext(ctx, sqlUpdateNetworkID, networkID, updated, id)
|
|
if err != nil {
|
|
return fmt.Errorf("UpdateNetworkID failed: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DBSQLite) UpdateEthVersion(ctx context.Context, id NodeID, ethVersion uint) error {
|
|
updated := time.Now().Unix()
|
|
|
|
_, err := db.db.ExecContext(ctx, sqlUpdateEthVersion, ethVersion, updated, id)
|
|
if err != nil {
|
|
return fmt.Errorf("UpdateEthVersion failed: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DBSQLite) UpdateHandshakeTransientError(ctx context.Context, id NodeID, hasTransientErr bool) error {
|
|
updated := time.Now().Unix()
|
|
|
|
_, err := db.db.ExecContext(ctx, sqlUpdateHandshakeTransientError, hasTransientErr, updated, id)
|
|
if err != nil {
|
|
return fmt.Errorf("UpdateHandshakeTransientError failed: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DBSQLite) InsertHandshakeError(ctx context.Context, id NodeID, handshakeErr string) error {
|
|
updated := time.Now().Unix()
|
|
|
|
_, err := db.db.ExecContext(ctx, sqlInsertHandshakeError, id, handshakeErr, updated)
|
|
if err != nil {
|
|
return fmt.Errorf("InsertHandshakeError failed: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DBSQLite) DeleteHandshakeErrors(ctx context.Context, id NodeID) error {
|
|
_, err := db.db.ExecContext(ctx, sqlDeleteHandshakeErrors, id)
|
|
if err != nil {
|
|
return fmt.Errorf("DeleteHandshakeErrors failed: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DBSQLite) FindHandshakeLastErrors(ctx context.Context, id NodeID, limit uint) ([]HandshakeError, error) {
|
|
cursor, err := db.db.QueryContext(
|
|
ctx,
|
|
sqlFindHandshakeLastErrors,
|
|
id,
|
|
limit)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("FindHandshakeLastErrors failed to query: %w", err)
|
|
}
|
|
defer func() {
|
|
_ = cursor.Close()
|
|
}()
|
|
|
|
var handshakeErrors []HandshakeError
|
|
for cursor.Next() {
|
|
var stringCode string
|
|
var updatedTimestamp int64
|
|
err := cursor.Scan(&stringCode, &updatedTimestamp)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("FindHandshakeLastErrors failed to read data: %w", err)
|
|
}
|
|
|
|
handshakeError := HandshakeError{
|
|
stringCode,
|
|
time.Unix(updatedTimestamp, 0),
|
|
}
|
|
|
|
handshakeErrors = append(handshakeErrors, handshakeError)
|
|
}
|
|
|
|
if err := cursor.Err(); err != nil {
|
|
return nil, fmt.Errorf("FindHandshakeLastErrors failed to iterate over rows: %w", err)
|
|
}
|
|
return handshakeErrors, nil
|
|
}
|
|
|
|
func (db *DBSQLite) UpdateHandshakeRetryTime(ctx context.Context, id NodeID, retryTime time.Time) error {
|
|
_, err := db.db.ExecContext(ctx, sqlUpdateHandshakeRetryTime, retryTime.Unix(), id)
|
|
if err != nil {
|
|
return fmt.Errorf("UpdateHandshakeRetryTime failed: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DBSQLite) FindHandshakeRetryTime(ctx context.Context, id NodeID) (*time.Time, error) {
|
|
row := db.db.QueryRowContext(ctx, sqlFindHandshakeRetryTime, id)
|
|
|
|
var timestamp sql.NullInt64
|
|
|
|
if err := row.Scan(×tamp); err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
return nil, fmt.Errorf("FindHandshakeRetryTime failed: %w", err)
|
|
}
|
|
|
|
// if never we tried to handshake then the time is NULL
|
|
if !timestamp.Valid {
|
|
return nil, nil
|
|
}
|
|
|
|
retryTime := time.Unix(timestamp.Int64, 0)
|
|
return &retryTime, nil
|
|
}
|
|
|
|
func (db *DBSQLite) CountHandshakeCandidates(ctx context.Context) (uint, error) {
|
|
retryTimeBefore := time.Now().Unix()
|
|
row := db.db.QueryRowContext(ctx, sqlCountHandshakeCandidates, retryTimeBefore)
|
|
var count uint
|
|
if err := row.Scan(&count); err != nil {
|
|
return 0, fmt.Errorf("CountHandshakeCandidates failed: %w", err)
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
func (db *DBSQLite) FindHandshakeCandidates(
|
|
ctx context.Context,
|
|
limit uint,
|
|
) ([]NodeID, error) {
|
|
retryTimeBefore := time.Now().Unix()
|
|
cursor, err := db.db.QueryContext(
|
|
ctx,
|
|
sqlFindHandshakeCandidates,
|
|
retryTimeBefore,
|
|
limit)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("FindHandshakeCandidates failed to query candidates: %w", err)
|
|
}
|
|
defer func() {
|
|
_ = cursor.Close()
|
|
}()
|
|
|
|
var nodes []NodeID
|
|
for cursor.Next() {
|
|
var id string
|
|
err := cursor.Scan(&id)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("FindHandshakeCandidates failed to read candidate data: %w", err)
|
|
}
|
|
|
|
nodes = append(nodes, NodeID(id))
|
|
}
|
|
|
|
if err := cursor.Err(); err != nil {
|
|
return nil, fmt.Errorf("FindHandshakeCandidates failed to iterate over candidates: %w", err)
|
|
}
|
|
return nodes, nil
|
|
}
|
|
|
|
func (db *DBSQLite) MarkTakenHandshakeCandidates(ctx context.Context, ids []NodeID) error {
|
|
if len(ids) == 0 {
|
|
return nil
|
|
}
|
|
|
|
delayedRetryTime := time.Now().Add(time.Hour).Unix()
|
|
|
|
idsPlaceholders := strings.TrimRight(strings.Repeat("?,", len(ids)), ",")
|
|
query := strings.Replace(sqlMarkTakenHandshakeCandidates, "123", idsPlaceholders, 1)
|
|
args := append([]interface{}{delayedRetryTime}, stringsToAny(ids)...)
|
|
|
|
_, err := db.db.ExecContext(ctx, query, args...)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to mark taken handshake candidates: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DBSQLite) TakeHandshakeCandidates(
|
|
ctx context.Context,
|
|
limit uint,
|
|
) ([]NodeID, error) {
|
|
tx, err := db.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("TakeHandshakeCandidates failed to start transaction: %w", err)
|
|
}
|
|
|
|
ids, err := db.FindHandshakeCandidates(
|
|
ctx,
|
|
limit)
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
return nil, err
|
|
}
|
|
|
|
err = db.MarkTakenHandshakeCandidates(ctx, ids)
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
return nil, err
|
|
}
|
|
|
|
err = tx.Commit()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("TakeHandshakeCandidates failed to commit transaction: %w", err)
|
|
}
|
|
return ids, nil
|
|
}
|
|
|
|
func (db *DBSQLite) UpdateForkCompatibility(ctx context.Context, id NodeID, isCompatFork bool) error {
|
|
updated := time.Now().Unix()
|
|
|
|
_, err := db.db.ExecContext(ctx, sqlUpdateForkCompatibility, isCompatFork, updated, id)
|
|
if err != nil {
|
|
return fmt.Errorf("UpdateForkCompatibility failed to update a node: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DBSQLite) UpdateNeighborBucketKeys(ctx context.Context, id NodeID, keys []string) error {
|
|
keysStr := strings.Join(keys, ",")
|
|
|
|
_, err := db.db.ExecContext(ctx, sqlUpdateNeighborBucketKeys, keysStr, id)
|
|
if err != nil {
|
|
return fmt.Errorf("UpdateNeighborBucketKeys failed to update a node: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DBSQLite) FindNeighborBucketKeys(ctx context.Context, id NodeID) ([]string, error) {
|
|
row := db.db.QueryRowContext(ctx, sqlFindNeighborBucketKeys, id)
|
|
|
|
var keysStr sql.NullString
|
|
if err := row.Scan(&keysStr); err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
return nil, fmt.Errorf("FindNeighborBucketKeys failed: %w", err)
|
|
}
|
|
|
|
if !keysStr.Valid {
|
|
return nil, nil
|
|
}
|
|
return strings.Split(keysStr.String, ","), nil
|
|
}
|
|
|
|
func (db *DBSQLite) UpdateSentryCandidatesLastEventTime(ctx context.Context, value time.Time) error {
|
|
_, err := db.db.ExecContext(ctx, sqlUpdateSentryCandidatesLastEventTime, value.Unix())
|
|
if err != nil {
|
|
return fmt.Errorf("UpdateSentryCandidatesLastEventTime failed: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DBSQLite) FindSentryCandidatesLastEventTime(ctx context.Context) (*time.Time, error) {
|
|
row := db.db.QueryRowContext(ctx, sqlFindSentryCandidatesLastEventTime)
|
|
|
|
var timestamp sql.NullInt64
|
|
if err := row.Scan(×tamp); err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
return nil, fmt.Errorf("FindSentryCandidatesLastEventTime failed: %w", err)
|
|
}
|
|
|
|
value := time.Unix(timestamp.Int64, 0)
|
|
return &value, nil
|
|
}
|
|
|
|
func (db *DBSQLite) UpdateCrawlRetryTime(ctx context.Context, id NodeID, retryTime time.Time) error {
|
|
_, err := db.db.ExecContext(ctx, sqlUpdateCrawlRetryTime, retryTime.Unix(), id)
|
|
if err != nil {
|
|
return fmt.Errorf("UpdateCrawlRetryTime failed: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DBSQLite) CountCandidates(ctx context.Context) (uint, error) {
|
|
retryTimeBefore := time.Now().Unix()
|
|
row := db.db.QueryRowContext(ctx, sqlCountCandidates, retryTimeBefore)
|
|
var count uint
|
|
if err := row.Scan(&count); err != nil {
|
|
return 0, fmt.Errorf("CountCandidates failed: %w", err)
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
func (db *DBSQLite) FindCandidates(
|
|
ctx context.Context,
|
|
limit uint,
|
|
) ([]NodeID, error) {
|
|
retryTimeBefore := time.Now().Unix()
|
|
cursor, err := db.db.QueryContext(
|
|
ctx,
|
|
sqlFindCandidates,
|
|
retryTimeBefore,
|
|
limit)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("FindCandidates failed to query candidates: %w", err)
|
|
}
|
|
defer func() {
|
|
_ = cursor.Close()
|
|
}()
|
|
|
|
var nodes []NodeID
|
|
for cursor.Next() {
|
|
var id string
|
|
err := cursor.Scan(&id)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("FindCandidates failed to read candidate data: %w", err)
|
|
}
|
|
|
|
nodes = append(nodes, NodeID(id))
|
|
}
|
|
|
|
if err := cursor.Err(); err != nil {
|
|
return nil, fmt.Errorf("FindCandidates failed to iterate over candidates: %w", err)
|
|
}
|
|
return nodes, nil
|
|
}
|
|
|
|
func (db *DBSQLite) MarkTakenNodes(ctx context.Context, ids []NodeID) error {
|
|
if len(ids) == 0 {
|
|
return nil
|
|
}
|
|
|
|
delayedRetryTime := time.Now().Add(time.Hour).Unix()
|
|
|
|
idsPlaceholders := strings.TrimRight(strings.Repeat("?,", len(ids)), ",")
|
|
query := strings.Replace(sqlMarkTakenNodes, "123", idsPlaceholders, 1)
|
|
args := append([]interface{}{delayedRetryTime}, stringsToAny(ids)...)
|
|
|
|
_, err := db.db.ExecContext(ctx, query, args...)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to mark taken nodes: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DBSQLite) TakeCandidates(
|
|
ctx context.Context,
|
|
limit uint,
|
|
) ([]NodeID, error) {
|
|
tx, err := db.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("TakeCandidates failed to start transaction: %w", err)
|
|
}
|
|
|
|
ids, err := db.FindCandidates(
|
|
ctx,
|
|
limit)
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
return nil, err
|
|
}
|
|
|
|
err = db.MarkTakenNodes(ctx, ids)
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
return nil, err
|
|
}
|
|
|
|
err = tx.Commit()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("TakeCandidates failed to commit transaction: %w", err)
|
|
}
|
|
return ids, nil
|
|
}
|
|
|
|
func (db *DBSQLite) IsConflictError(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
return strings.Contains(err.Error(), "SQLITE_BUSY")
|
|
}
|
|
|
|
func (db *DBSQLite) CountNodes(ctx context.Context, maxPingTries uint, networkID uint) (uint, error) {
|
|
row := db.db.QueryRowContext(ctx, sqlCountNodes, maxPingTries, networkID)
|
|
var count uint
|
|
if err := row.Scan(&count); err != nil {
|
|
return 0, fmt.Errorf("CountNodes failed: %w", err)
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
func (db *DBSQLite) CountIPs(ctx context.Context, maxPingTries uint, networkID uint) (uint, error) {
|
|
row := db.db.QueryRowContext(ctx, sqlCountIPs, maxPingTries, networkID)
|
|
var count uint
|
|
if err := row.Scan(&count); err != nil {
|
|
return 0, fmt.Errorf("CountIPs failed: %w", err)
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
func (db *DBSQLite) CountClients(ctx context.Context, clientIDPrefix string, maxPingTries uint, networkID uint) (uint, error) {
|
|
row := db.db.QueryRowContext(ctx, sqlCountClients, maxPingTries, networkID, clientIDPrefix+"%")
|
|
var count uint
|
|
if err := row.Scan(&count); err != nil {
|
|
return 0, fmt.Errorf("CountClients failed: %w", err)
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
func (db *DBSQLite) CountClientsWithNetworkID(ctx context.Context, clientIDPrefix string, maxPingTries uint) (uint, error) {
|
|
row := db.db.QueryRowContext(ctx, sqlCountClientsWithNetworkID, maxPingTries, clientIDPrefix+"%")
|
|
var count uint
|
|
if err := row.Scan(&count); err != nil {
|
|
return 0, fmt.Errorf("CountClientsWithNetworkID failed: %w", err)
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
func (db *DBSQLite) CountClientsWithHandshakeTransientError(ctx context.Context, clientIDPrefix string, maxPingTries uint) (uint, error) {
|
|
row := db.db.QueryRowContext(ctx, sqlCountClientsWithHandshakeTransientError, maxPingTries, clientIDPrefix+"%")
|
|
var count uint
|
|
if err := row.Scan(&count); err != nil {
|
|
return 0, fmt.Errorf("CountClientsWithHandshakeTransientError failed: %w", err)
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
func (db *DBSQLite) EnumerateClientIDs(
|
|
ctx context.Context,
|
|
maxPingTries uint,
|
|
networkID uint,
|
|
enumFunc func(clientID *string),
|
|
) error {
|
|
cursor, err := db.db.QueryContext(ctx, sqlEnumerateClientIDs, maxPingTries, networkID)
|
|
if err != nil {
|
|
return fmt.Errorf("EnumerateClientIDs failed to query: %w", err)
|
|
}
|
|
defer func() {
|
|
_ = cursor.Close()
|
|
}()
|
|
|
|
for cursor.Next() {
|
|
var clientID sql.NullString
|
|
err := cursor.Scan(&clientID)
|
|
if err != nil {
|
|
return fmt.Errorf("EnumerateClientIDs failed to read data: %w", err)
|
|
}
|
|
if clientID.Valid {
|
|
enumFunc(&clientID.String)
|
|
} else {
|
|
enumFunc(nil)
|
|
}
|
|
}
|
|
|
|
if err := cursor.Err(); err != nil {
|
|
return fmt.Errorf("EnumerateClientIDs failed to iterate: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func stringsToAny(strValues []NodeID) []interface{} {
|
|
values := make([]interface{}, 0, len(strValues))
|
|
for _, value := range strValues {
|
|
values = append(values, value)
|
|
}
|
|
return values
|
|
}
|