287 lines
8.1 KiB
Go
287 lines
8.1 KiB
Go
package webhooks
|
|
|
|
import (
|
|
"context"
|
|
"crypto/hmac"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"math/rand/v2"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"atcr.io/pkg/appview/db"
|
|
"atcr.io/pkg/appview/storage"
|
|
"atcr.io/pkg/atproto"
|
|
)
|
|
|
|
// Dispatcher handles webhook delivery for push and scan notifications.
|
|
// It reads webhooks from the appview DB and delivers payloads
|
|
// with Discord/Slack formatting and HMAC signing.
|
|
type Dispatcher struct {
|
|
db db.DBTX
|
|
meta atproto.AppviewMetadata
|
|
}
|
|
|
|
// NewDispatcher creates a new webhook dispatcher
|
|
func NewDispatcher(database db.DBTX, meta atproto.AppviewMetadata) *Dispatcher {
|
|
return &Dispatcher{
|
|
db: database,
|
|
meta: meta,
|
|
}
|
|
}
|
|
|
|
// DispatchForScan fires matching webhooks after a scan record arrives via Jetstream.
|
|
// previousScan is nil for first-time scans. userHandle is used for payload enrichment.
|
|
func (d *Dispatcher) DispatchForScan(ctx context.Context, scan, previousScan *db.Scan, userHandle, tag, holdEndpoint string) {
|
|
webhooks, err := db.GetWebhooksForUser(d.db, scan.UserDID)
|
|
if err != nil || len(webhooks) == 0 {
|
|
return
|
|
}
|
|
|
|
isFirst := previousScan == nil
|
|
isChanged := previousScan != nil && vulnCountsChanged(scan, previousScan)
|
|
|
|
scanInfo := WebhookScanInfo{
|
|
ScannedAt: scan.ScannedAt.Format(time.RFC3339),
|
|
ScannerVersion: scan.ScannerVersion,
|
|
Vulnerabilities: WebhookVulnCounts{
|
|
Critical: scan.Critical,
|
|
High: scan.High,
|
|
Medium: scan.Medium,
|
|
Low: scan.Low,
|
|
Total: scan.Total,
|
|
},
|
|
}
|
|
|
|
manifestInfo := WebhookManifestInfo{
|
|
Digest: scan.ManifestDigest,
|
|
Repository: scan.Repository,
|
|
Tag: tag,
|
|
UserDID: scan.UserDID,
|
|
UserHandle: userHandle,
|
|
}
|
|
|
|
for _, wh := range webhooks {
|
|
// Check each trigger condition against bitmask
|
|
var triggers []string
|
|
if wh.Triggers&TriggerFirst != 0 && isFirst {
|
|
triggers = append(triggers, "scan:first")
|
|
}
|
|
if wh.Triggers&TriggerAll != 0 {
|
|
triggers = append(triggers, "scan:all")
|
|
}
|
|
if wh.Triggers&TriggerChanged != 0 && isChanged {
|
|
triggers = append(triggers, "scan:changed")
|
|
}
|
|
|
|
for _, trigger := range triggers {
|
|
payload := WebhookPayload{
|
|
Trigger: trigger,
|
|
HoldDID: scan.HoldDID,
|
|
HoldEndpoint: holdEndpoint,
|
|
Manifest: manifestInfo,
|
|
Scan: scanInfo,
|
|
}
|
|
|
|
// Include previous counts for scan:changed
|
|
if trigger == "scan:changed" && previousScan != nil {
|
|
payload.Previous = &WebhookVulnCounts{
|
|
Critical: previousScan.Critical,
|
|
High: previousScan.High,
|
|
Medium: previousScan.Medium,
|
|
Low: previousScan.Low,
|
|
Total: previousScan.Total,
|
|
}
|
|
}
|
|
|
|
payloadBytes, err := json.Marshal(payload)
|
|
if err != nil {
|
|
slog.Error("Failed to marshal webhook payload", "error", err)
|
|
continue
|
|
}
|
|
|
|
go d.deliverWithRetry(wh.URL, wh.Secret, payloadBytes)
|
|
}
|
|
}
|
|
}
|
|
|
|
// DispatchForPush fires matching webhooks after a manifest is pushed.
|
|
func (d *Dispatcher) DispatchForPush(ctx context.Context, event storage.PushWebhookEvent) {
|
|
webhooks, err := db.GetWebhooksForUser(d.db, event.OwnerDID)
|
|
if err != nil || len(webhooks) == 0 {
|
|
return
|
|
}
|
|
|
|
// Fetch star/pull counts for payload enrichment
|
|
var starCount, pullCount int
|
|
stats, err := db.GetRepositoryStats(d.db, event.OwnerDID, event.Repository)
|
|
if err == nil && stats != nil {
|
|
starCount = stats.StarCount
|
|
pullCount = stats.PullCount
|
|
}
|
|
|
|
// Build repo URL using the primary registry domain (pull domain) if available
|
|
baseURL := d.meta.BaseURL
|
|
if len(d.meta.RegistryDomains) > 0 {
|
|
baseURL = "https://" + d.meta.RegistryDomains[0]
|
|
}
|
|
repoURL := fmt.Sprintf("%s/%s/%s", baseURL, event.OwnerHandle, event.Repository)
|
|
|
|
payload := PushWebhookPayload{
|
|
Trigger: "push",
|
|
PushData: PushData{
|
|
PushedAt: time.Now().Format(time.RFC3339),
|
|
Pusher: event.PusherHandle,
|
|
PusherDID: event.PusherDID,
|
|
Tag: event.Tag,
|
|
Digest: event.Digest,
|
|
},
|
|
Repository: PushRepository{
|
|
Name: event.Repository,
|
|
Namespace: event.OwnerHandle,
|
|
RepoName: event.OwnerHandle + "/" + event.Repository,
|
|
RepoURL: repoURL,
|
|
MediaType: event.MediaType,
|
|
StarCount: starCount,
|
|
PullCount: pullCount,
|
|
},
|
|
Hold: PushHold{
|
|
DID: event.HoldDID,
|
|
Endpoint: event.HoldEndpoint,
|
|
},
|
|
}
|
|
|
|
payloadBytes, err := json.Marshal(payload)
|
|
if err != nil {
|
|
slog.Error("Failed to marshal push webhook payload", "error", err)
|
|
return
|
|
}
|
|
|
|
for _, wh := range webhooks {
|
|
if wh.Triggers&TriggerPush == 0 {
|
|
continue
|
|
}
|
|
go d.deliverWithRetry(wh.URL, wh.Secret, payloadBytes)
|
|
}
|
|
}
|
|
|
|
// DeliverTest sends a test payload to a specific webhook (synchronous, single attempt)
|
|
func (d *Dispatcher) DeliverTest(ctx context.Context, webhookID, userDID, userHandle string) (bool, error) {
|
|
wh, err := db.GetWebhookByID(d.db, webhookID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if wh.UserDID != userDID {
|
|
return false, fmt.Errorf("unauthorized")
|
|
}
|
|
|
|
// Randomize vulnerability counts so each test shows a different severity color
|
|
critical := rand.IntN(3)
|
|
high := rand.IntN(5)
|
|
medium := rand.IntN(8)
|
|
low := rand.IntN(10)
|
|
total := critical + high + medium + low
|
|
|
|
payload := WebhookPayload{
|
|
Trigger: "test",
|
|
Manifest: WebhookManifestInfo{
|
|
Digest: "sha256:0000000000000000000000000000000000000000000000000000000000000000",
|
|
Repository: "test-repo",
|
|
Tag: "latest",
|
|
UserDID: userDID,
|
|
UserHandle: userHandle,
|
|
},
|
|
Scan: WebhookScanInfo{
|
|
ScannedAt: time.Now().Format(time.RFC3339),
|
|
ScannerVersion: "atcr-scanner-v1.0.0",
|
|
Vulnerabilities: WebhookVulnCounts{
|
|
Critical: critical, High: high, Medium: medium, Low: low, Total: total,
|
|
},
|
|
},
|
|
}
|
|
|
|
payloadBytes, _ := json.Marshal(payload)
|
|
success := d.attemptDelivery(wh.URL, wh.Secret, payloadBytes)
|
|
return success, nil
|
|
}
|
|
|
|
// deliverWithRetry attempts to deliver a webhook with exponential backoff
|
|
func (d *Dispatcher) deliverWithRetry(webhookURL, secret string, payload []byte) {
|
|
delays := []time.Duration{0, 30 * time.Second, 2 * time.Minute, 8 * time.Minute}
|
|
for attempt, delay := range delays {
|
|
if attempt > 0 {
|
|
time.Sleep(delay)
|
|
}
|
|
if d.attemptDelivery(webhookURL, secret, payload) {
|
|
return
|
|
}
|
|
}
|
|
slog.Warn("Webhook delivery failed after retries", "url", maskURL(webhookURL))
|
|
}
|
|
|
|
// attemptDelivery sends a single webhook HTTP POST
|
|
func (d *Dispatcher) attemptDelivery(webhookURL, secret string, payload []byte) bool {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
// Reformat payload for platform-specific webhook APIs
|
|
sendPayload := payload
|
|
if isDiscordWebhook(webhookURL) || isSlackWebhook(webhookURL) {
|
|
formatted, fmtErr := formatPlatformPayload(payload, webhookURL, d.meta)
|
|
if fmtErr == nil {
|
|
sendPayload = formatted
|
|
}
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "POST", webhookURL, strings.NewReader(string(sendPayload)))
|
|
if err != nil {
|
|
slog.Warn("Failed to create webhook request", "error", err)
|
|
return false
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("User-Agent", d.meta.ClientShortName+"-Webhook/1.0")
|
|
|
|
// HMAC signing if secret is set (signs the actual payload sent)
|
|
if secret != "" {
|
|
mac := hmac.New(sha256.New, []byte(secret))
|
|
mac.Write(sendPayload)
|
|
sig := hex.EncodeToString(mac.Sum(nil))
|
|
req.Header.Set("X-Webhook-Signature-256", "sha256="+sig)
|
|
}
|
|
|
|
client := &http.Client{Timeout: 10 * time.Second}
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
slog.Warn("Webhook delivery attempt failed", "url", maskURL(webhookURL), "error", err)
|
|
return false
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
|
|
slog.Info("Webhook delivered successfully", "url", maskURL(webhookURL), "status", resp.StatusCode)
|
|
return true
|
|
}
|
|
|
|
// Read response body for debugging
|
|
body, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
|
|
slog.Warn("Webhook delivery got non-2xx response",
|
|
"url", maskURL(webhookURL),
|
|
"status", resp.StatusCode,
|
|
"body", string(body))
|
|
return false
|
|
}
|
|
|
|
// vulnCountsChanged checks if vulnerability counts differ between scans
|
|
func vulnCountsChanged(current, previous *db.Scan) bool {
|
|
return current.Critical != previous.Critical ||
|
|
current.High != previous.High ||
|
|
current.Medium != previous.Medium ||
|
|
current.Low != previous.Low
|
|
}
|