redis.go (3308B)
1 package main 2 3 /* 4 * Copyright (c) 2017-2021 Ivan Jelincic <parazyd@dyne.org> 5 * 6 * This file is part of tor-dam 7 * 8 * This program is free software: you can redistribute it and/or modify 9 * it under the terms of the GNU Affero General Public License as published by 10 * the Free Software Foundation, either version 3 of the License, or 11 * (at your option) any later version. 12 * 13 * This program is distributed in the hope that it will be useful, 14 * but WITHOUT ANY WARRANTY; without even the implied warranty of 15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 * GNU Affero General Public License for more details. 17 * 18 * You should have received a copy of the GNU Affero General Public License 19 * along with this program. If not, see <http://www.gnu.org/licenses/>. 20 */ 21 22 import ( 23 "context" 24 "fmt" 25 "log" 26 "os/exec" 27 "strconv" 28 "strings" 29 "time" 30 31 "github.com/go-redis/redis" 32 ) 33 34 // rctx is the Redis context (necessary in newer go-redis) 35 var rctx = context.Background() 36 var rcli *redis.Client 37 38 func pollPrune(interval int64) { 39 for { 40 log.Println("Polling redis for expired nodes") 41 nodes, err := rcli.Keys(rctx, "*.onion").Result() 42 if err != nil { 43 log.Println("WARNING: Nonfatal error in pollPrune:", err.Error()) 44 } 45 now := time.Now().Unix() 46 47 for _, i := range nodes { 48 res, err := rcli.HGet(rctx, i, "lastseen").Result() 49 if err != nil { 50 log.Println("WARNING: Nonfatal error in pollPrune:", err.Error()) 51 continue 52 } 53 ls, err := strconv.Atoi(res) 54 if err != nil { 55 log.Println("WARNING: Nonfatal error in pollPrune:", err.Error()) 56 continue 57 } 58 59 diff := (now - int64(ls)) / 60 60 if diff > interval { 61 log.Printf("Deleting %s (expired)\n", i) 62 publishToRedis('D', i) 63 rcli.Del(rctx, i) 64 } 65 } 66 time.Sleep(time.Duration(interval) * time.Minute) 67 } 68 } 69 70 func publishToRedis(mt rune, addr string) { 71 data, err := rcli.HGetAll(rctx, addr).Result() 72 if err != nil { 73 log.Println("WARNING: Nonfatal err in publishToRedis:", err.Error()) 74 return 75 } 76 77 if data["lastseen"] == data["firstseen"] { 78 mt = 'A' 79 } else if mt != 'D' { 80 mt = 'M' 81 } 82 83 // TODO: First of the "addr" references could be alias/nickname 84 85 rcli.Publish(rctx, pubsubChan, fmt.Sprintf("%s|%s|%v|%s", 86 data["lastseen"], addr, mt, addr)) 87 } 88 89 func newredisrc(dir string) string { 90 return fmt.Sprintf(`daemonize no 91 bind %s 92 port %d 93 databases 1 94 dir %s 95 dbfilename tor-dam.rdb 96 save 900 1 97 save 300 10 98 save 60 10000 99 rdbcompression yes 100 rdbchecksum yes 101 stop-writes-on-bgsave-error no`, 102 redisAddr.IP.String(), redisAddr.Port, dir) 103 } 104 105 func spawnRedis() (*exec.Cmd, error) { 106 var err error 107 redisAddr, err = getListener() 108 if err != nil { 109 return nil, err 110 } 111 112 rcli = redis.NewClient(&redis.Options{ 113 Addr: redisAddr.String(), 114 Password: "", 115 DB: 0, 116 }) 117 118 log.Println("Forking Redis daemon on", redisAddr.String()) 119 120 cmd := exec.Command("redis-server", "-") 121 cmd.Stdin = strings.NewReader(newredisrc(*workdir)) 122 123 err = cmd.Start() 124 if err != nil { 125 return nil, err 126 } 127 128 time.Sleep(500 * time.Millisecond) 129 if _, err := rcli.Ping(rctx).Result(); err != nil { 130 return cmd, err 131 } 132 133 pubsub := rcli.Subscribe(rctx, pubsubChan) 134 if _, err := pubsub.Receive(rctx); err != nil { 135 return cmd, err 136 } 137 138 log.Printf("Created \"%s\" channel in Redis\n", pubsubChan) 139 140 return cmd, nil 141 }