Alex Sharov 226982d293
Use class dir.Rw - to separate Ro and Rw access to snapshotDir (#3534)
* save

* save

* save

* save

* save

* save

* save
2022-02-18 09:24:17 +07:00

286 lines
8.8 KiB

package main
import (
lg ""
grpc_middleware ""
grpc_recovery ""
proto_downloader ""
var (
datadir string
asJson bool
forceRebuild bool
forceVerify bool
downloaderApiAddr string
torrentVerbosity string
downloadRateStr, uploadRateStr string
torrentPort int
func init() {
flags := append(debug.Flags, utils.MetricFlags...)
utils.CobraFlags(rootCmd, flags)
rootCmd.Flags().StringVar(&downloaderApiAddr, "downloader.api.addr", "", "external downloader api network address, for example: serves remote downloader interface")
rootCmd.Flags().StringVar(&torrentVerbosity, "torrent.verbosity", lg.Warning.LogString(), "DEBUG | INFO | WARN | ERROR")
rootCmd.Flags().StringVar(&downloadRateStr, "", "8mb", "bytes per second, example: 32mb")
rootCmd.Flags().StringVar(&uploadRateStr, "torrent.upload.rate", "8mb", "bytes per second, example: 32mb")
rootCmd.Flags().IntVar(&torrentPort, "torrent.port", 42069, "port to listen and serve BitTorrent protocol")
printTorrentHashes.PersistentFlags().BoolVar(&asJson, "json", false, "Print in json format (default: toml)")
printTorrentHashes.PersistentFlags().BoolVar(&forceRebuild, "rebuild", false, "Force re-create .torrent files")
printTorrentHashes.PersistentFlags().BoolVar(&forceVerify, "verify", false, "Force verify data files if have .torrent files")
func withDatadir(cmd *cobra.Command) {
cmd.Flags().StringVar(&datadir, utils.DataDirFlag.Name, paths.DefaultDataDir(), utils.DataDirFlag.Usage)
if err := cmd.MarkFlagDirname(utils.DataDirFlag.Name); err != nil {
func main() {
ctx, cancel := common.RootContext()
defer cancel()
if err := rootCmd.ExecuteContext(ctx); err != nil {
var rootCmd = &cobra.Command{
Use: "",
Short: "snapshot downloader",
Example: "go run ./cmd/snapshots --datadir <your_datadir> --downloader.api.addr",
PersistentPreRun: func(cmd *cobra.Command, args []string) {
if err := debug.SetupCobra(cmd); err != nil {
PersistentPostRun: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
if err := Downloader(cmd.Context()); err != nil {
log.Error("Downloader", "err", err)
return nil
return nil
func Downloader(ctx context.Context) error {
snapshotDir, err := dir.OpenRw(filepath.Join(datadir, "snapshots"))
if err != nil {
return err
defer snapshotDir.Close()
torrentLogLevel, ok := torrentcfg.String2LogLevel[torrentVerbosity]
if !ok {
panic(fmt.Errorf("unexpected torrent.verbosity level: %s", torrentVerbosity))
var downloadRate, uploadRate datasize.ByteSize
if err := downloadRate.UnmarshalText([]byte(downloadRateStr)); err != nil {
return err
if err := uploadRate.UnmarshalText([]byte(uploadRateStr)); err != nil {
return err
log.Info("Run snapshot downloader", "addr", downloaderApiAddr, "datadir", datadir, "download.rate", downloadRate.String(), "upload.rate", uploadRate.String())
cfg, pieceCompletion, err := torrentcfg.New(snapshotDir, torrentLogLevel, downloadRate, uploadRate, torrentPort)
if err != nil {
return err
defer pieceCompletion.Close()
protocols, err := downloader.New(cfg, snapshotDir)
if err != nil {
return err
defer protocols.Close()
log.Info("[torrent] Start", "my peerID", fmt.Sprintf("%x", protocols.TorrentClient.PeerID()))
if err = downloader.CreateTorrentFilesAndAdd(ctx, snapshotDir, protocols.TorrentClient); err != nil {
return fmt.Errorf("CreateTorrentFilesAndAdd: %w", err)
go downloader.LoggingLoop(ctx, protocols.TorrentClient)
bittorrentServer, err := downloader.NewGrpcServer(protocols.DB, protocols, snapshotDir, true)
if err != nil {
return fmt.Errorf("new server: %w", err)
grpcServer, err := StartGrpc(bittorrentServer, downloaderApiAddr, nil)
if err != nil {
return err
defer grpcServer.GracefulStop()
return nil
var printTorrentHashes = &cobra.Command{
Use: "torrent_hashes",
Example: "go run ./cmd/downloader torrent_hashes --datadir <your_datadir>",
RunE: func(cmd *cobra.Command, args []string) error {
snapshotDir := filepath.Join(datadir, "snapshots")
ctx := cmd.Context()
if forceVerify { // remove and create .torrent files (will re-read all snapshots)
return downloader.VerifyDtaFiles(ctx, snapshotDir)
if forceRebuild { // remove and create .torrent files (will re-read all snapshots)
lockedSnapshotDir, err := dir.OpenRw(snapshotDir)
if err != nil {
return err
defer lockedSnapshotDir.Close()
files, err := downloader.AllTorrentPaths(snapshotDir)
if err != nil {
return err
for _, filePath := range files {
if err := os.Remove(filePath); err != nil {
return err
if err := downloader.BuildTorrentFilesIfNeed(ctx, lockedSnapshotDir); err != nil {
return err
res := map[string]string{}
files, err := downloader.AllTorrentPaths(snapshotDir)
if err != nil {
return err
for _, torrentFilePath := range files {
mi, err := metainfo.LoadFromFile(torrentFilePath)
if err != nil {
return err
info, err := mi.UnmarshalInfo()
if err != nil {
return err
res[info.Name] = mi.HashInfoBytes().String()
var serialized []byte
if asJson {
serialized, err = json.Marshal(res)
if err != nil {
return err
} else {
serialized, err = toml.Marshal(res)
if err != nil {
return err
fmt.Printf("%s\n", serialized)
return nil
func removeChunksStorage(snapshotDir *dir.Rw) {
_ = os.RemoveAll(filepath.Join(snapshotDir.Path, ".torrent.db"))
_ = os.RemoveAll(filepath.Join(snapshotDir.Path, ".torrent.bolt.db"))
_ = os.RemoveAll(filepath.Join(snapshotDir.Path, ".torrent.db-shm"))
_ = os.RemoveAll(filepath.Join(snapshotDir.Path, ".torrent.db-wal"))
func StartGrpc(snServer *downloader.GrpcServer, addr string, creds *credentials.TransportCredentials) (*grpc.Server, error) {
lis, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("could not create listener: %w, addr=%s", err, addr)
var (
streamInterceptors []grpc.StreamServerInterceptor
unaryInterceptors []grpc.UnaryServerInterceptor
streamInterceptors = append(streamInterceptors, grpc_recovery.StreamServerInterceptor())
unaryInterceptors = append(unaryInterceptors, grpc_recovery.UnaryServerInterceptor())
//if metrics.Enabled {
// streamInterceptors = append(streamInterceptors, grpc_prometheus.StreamServerInterceptor)
// unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor)
opts := []grpc.ServerOption{
MinTime: 10 * time.Second,
PermitWithoutStream: true,
if creds == nil {
// no specific opts
} else {
opts = append(opts, grpc.Creds(*creds))
grpcServer := grpc.NewServer(opts...)
reflection.Register(grpcServer) // Register reflection service on gRPC server.
if snServer != nil {
proto_downloader.RegisterDownloaderServer(grpcServer, snServer)
//if metrics.Enabled {
// grpc_prometheus.Register(grpcServer)
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
go func() {
defer healthServer.Shutdown()
if err := grpcServer.Serve(lis); err != nil {
log.Error("gRPC server stop", "err", err)
log.Info("Started gRPC server", "on", addr)
return grpcServer, nil