tor-dam

tor distributed announce mechanism (not a dht)
git clone https://git.parazyd.org/tor-dam
Log | Files | Refs | README | LICENSE

redis.go (3308B)


      1 package main
      2 
      3 /*
      4  * Copyright (c) 2017-2021 Ivan Jelincic <parazyd@dyne.org>
      5  *
      6  * This file is part of tor-dam
      7  *
      8  * This program is free software: you can redistribute it and/or modify
      9  * it under the terms of the GNU Affero General Public License as published by
     10  * the Free Software Foundation, either version 3 of the License, or
     11  * (at your option) any later version.
     12  *
     13  * This program is distributed in the hope that it will be useful,
     14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
     15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     16  * GNU Affero General Public License for more details.
     17  *
     18  * You should have received a copy of the GNU Affero General Public License
     19  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
     20  */
     21 
     22 import (
     23 	"context"
     24 	"fmt"
     25 	"log"
     26 	"os/exec"
     27 	"strconv"
     28 	"strings"
     29 	"time"
     30 
     31 	"github.com/go-redis/redis"
     32 )
     33 
     34 // rctx is the Redis context (necessary in newer go-redis)
     35 var rctx = context.Background()
     36 var rcli *redis.Client
     37 
     38 func pollPrune(interval int64) {
     39 	for {
     40 		log.Println("Polling redis for expired nodes")
     41 		nodes, err := rcli.Keys(rctx, "*.onion").Result()
     42 		if err != nil {
     43 			log.Println("WARNING: Nonfatal error in pollPrune:", err.Error())
     44 		}
     45 		now := time.Now().Unix()
     46 
     47 		for _, i := range nodes {
     48 			res, err := rcli.HGet(rctx, i, "lastseen").Result()
     49 			if err != nil {
     50 				log.Println("WARNING: Nonfatal error in pollPrune:", err.Error())
     51 				continue
     52 			}
     53 			ls, err := strconv.Atoi(res)
     54 			if err != nil {
     55 				log.Println("WARNING: Nonfatal error in pollPrune:", err.Error())
     56 				continue
     57 			}
     58 
     59 			diff := (now - int64(ls)) / 60
     60 			if diff > interval {
     61 				log.Printf("Deleting %s (expired)\n", i)
     62 				publishToRedis('D', i)
     63 				rcli.Del(rctx, i)
     64 			}
     65 		}
     66 		time.Sleep(time.Duration(interval) * time.Minute)
     67 	}
     68 }
     69 
     70 func publishToRedis(mt rune, addr string) {
     71 	data, err := rcli.HGetAll(rctx, addr).Result()
     72 	if err != nil {
     73 		log.Println("WARNING: Nonfatal err in publishToRedis:", err.Error())
     74 		return
     75 	}
     76 
     77 	if data["lastseen"] == data["firstseen"] {
     78 		mt = 'A'
     79 	} else if mt != 'D' {
     80 		mt = 'M'
     81 	}
     82 
     83 	// TODO: First of the "addr" references could be alias/nickname
     84 
     85 	rcli.Publish(rctx, pubsubChan, fmt.Sprintf("%s|%s|%v|%s",
     86 		data["lastseen"], addr, mt, addr))
     87 }
     88 
     89 func newredisrc(dir string) string {
     90 	return fmt.Sprintf(`daemonize no
     91 bind %s
     92 port %d
     93 databases 1
     94 dir %s
     95 dbfilename tor-dam.rdb
     96 save 900 1
     97 save 300 10
     98 save 60 10000
     99 rdbcompression yes
    100 rdbchecksum yes
    101 stop-writes-on-bgsave-error no`,
    102 		redisAddr.IP.String(), redisAddr.Port, dir)
    103 }
    104 
    105 func spawnRedis() (*exec.Cmd, error) {
    106 	var err error
    107 	redisAddr, err = getListener()
    108 	if err != nil {
    109 		return nil, err
    110 	}
    111 
    112 	rcli = redis.NewClient(&redis.Options{
    113 		Addr:     redisAddr.String(),
    114 		Password: "",
    115 		DB:       0,
    116 	})
    117 
    118 	log.Println("Forking Redis daemon on", redisAddr.String())
    119 
    120 	cmd := exec.Command("redis-server", "-")
    121 	cmd.Stdin = strings.NewReader(newredisrc(*workdir))
    122 
    123 	err = cmd.Start()
    124 	if err != nil {
    125 		return nil, err
    126 	}
    127 
    128 	time.Sleep(500 * time.Millisecond)
    129 	if _, err := rcli.Ping(rctx).Result(); err != nil {
    130 		return cmd, err
    131 	}
    132 
    133 	pubsub := rcli.Subscribe(rctx, pubsubChan)
    134 	if _, err := pubsub.Receive(rctx); err != nil {
    135 		return cmd, err
    136 	}
    137 
    138 	log.Printf("Created \"%s\" channel in Redis\n", pubsubChan)
    139 
    140 	return cmd, nil
    141 }