From 6c719c08d8643b7b2d96663e9baef13ba430a4b4 Mon Sep 17 00:00:00 2001
From: Per Cederqvist <ceder@lysator.liu.se>
Date: Tue, 19 Dec 2006 19:37:04 +0000
Subject: [PATCH] Run distribute-tasks interactively and have the operator
 confirm any changes.  Better handling of changed origins.

* distribute-tasks
  (JobInfo.__init__): Initialize __active.
  (JobInfo.set_active): New method.
  (JobInfo.active): New method.
  (JobInfo.destination): New method.
  (tasks_per_destination): New global variable.
  (interactive): New global variable.
  (read_tasks): Update tasks_per_destination.
  (read_new_tasks): Use JobInfo.source() to simplify code.  Update
    tasks_per_destination.  Set lysrdiffpart of info if the source is
    already backed up, to avoid moving a backed up directory to a
    different partition.  Report a fatal error if the destination
    changes.
  (write_task_lists): Only skip jobs if running interactively.  Only
    reorder tasks if running non-interactively.  Have the user confirm
    changes if running interactively.
  (main): New argument: -i (--interactive).  Only read new tasks if
    running interactively.
---
 ChangeLog        | 20 +++++++++++
 distribute-tasks | 86 +++++++++++++++++++++++++++++++++++++++++++-----
 2 files changed, 97 insertions(+), 9 deletions(-)

diff --git a/ChangeLog b/ChangeLog
index ce2e6fb..94047d2 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,25 @@
 2006-12-19  Per Cederqvist  <ceder@sedesopim.lysator.liu.se>
 
+	Run distribute-tasks interactively and have the operator confirm
+	any changes.  Better handling of changed origins.
+	* distribute-tasks (JobInfo.__init__): Initialize __active.
+	(JobInfo.set_active): New method.
+	(JobInfo.active): New method.
+	(JobInfo.destination): New method.
+	(tasks_per_destination): New global variable.
+	(interactive): New global variable.
+	(read_tasks): Update tasks_per_destination.
+	(read_new_tasks): Use JobInfo.source() to simplify code.  Update
+	tasks_per_destination.  Set lysrdiffpart of info if the source is
+	already backed up, to avoid moving a backed up directory to a
+	different partition.  Report a fatal error if the destination
+	changes.
+	(write_task_lists): Only skip jobs if running interactively.  Only
+	reorder tasks if running non-interactively.  Have the user confirm
+	changes if running interactively.
+	(main): New argument: -i (--interactive).  Only read new tasks if
+	running interactively.
+
 	Use syslog.  Keep track of the origin of backups.  Removed
 	workaround for manhattan.  Detect crashed servers faster.
 	* backup-one-task: Removed special case for manhattan.  Store the
diff --git a/distribute-tasks b/distribute-tasks
index 0b68c3f..8bd4b6f 100755
--- a/distribute-tasks
+++ b/distribute-tasks
@@ -4,6 +4,7 @@ import os
 import sys
 import random
 import sets
+import getopt
 
 ROOT = "/lysrdiff"
 
@@ -15,6 +16,13 @@ class JobInfo(object):
         self.__host = host
         self.__directory = directory
         self.__lysrdiffpart = lysrdiffpart
+        self.__active = False
+
+    def set_active(self):
+        self.__active = True
+
+    def active(self):
+        return self.__active
 
     def set_lysrdiffpart(self, part):
         self.__lysrdiffpart = part
@@ -37,6 +45,9 @@ class JobInfo(object):
     def source(self):
         return (self.host(), self.directory())
 
+    def destination(self):
+        return (self.category(), self.subcategory())
+
     def rdiff_dest(self):
         return "/lysrdiff/%d/perm/lysrdiff/backups/%s/%s" % (
             self.lysrdiffpart(), self.category(), self.subcategory())
@@ -53,10 +64,15 @@ def newtasks():
 # value: JobInfo
 tasks_per_source = {}
 
+# key: (category, subcategory)
+# value: list of JobInfo
+tasks_per_destination = {}
+
 # value: JobInfo
 ordered_tasks = []
 
 fatal = False
+interactive = False
 
 def tasklist_file(lysrdiffpart):
     return "/lysrdiff/%d/perm/lysrdiff/tasks" % lysrdiffpart
@@ -77,8 +93,9 @@ def read_tasks(lysrdiffpart):
                 tasks_per_source[info.source()].task_desc(),
                 info.task_desc()))
             fatal = True
-                             
+
         tasks_per_source[info.source()] = info
+        tasks_per_destination.setdefault(info.destination(), []).append(info)
 	ordered_tasks.append(info)
 
 def read_new_tasks():
@@ -88,12 +105,27 @@ def read_new_tasks():
     new_tasks = []
     for line in file("/nobackup/backup.lysator/var/tasks"):
         info = parse_line(line)
-        if (info.host(), info.directory()) not in tasks_per_source:
-            info.set_lysrdiffpart(newtasks())
-            tasks_per_source[(info.host(), info.directory())] = info
+        if info.source() not in tasks_per_source:
+            for other in tasks_per_destination.get(info.destination(), []):
+                if other.lysrdiffpart() is not None:
+                    info.set_lysrdiffpart(other.lysrdiffpart())
+                    break
+            else:
+                info.set_lysrdiffpart(newtasks())
+            tasks_per_source[info.source()] = info
+            tasks_per_destination.setdefault(info.destination(), []).append(
+                info)
 	    new_tasks.append(info)
             new_found = True
 
+        if tasks_per_source[info.source()].destination() == info.destination():
+            tasks_per_source[info.source()].set_active()
+        else:
+            sys.stderr.write("Changed destination detected!\n  %s\n  %s\n" % (
+                tasks_per_source[info.source()].destination(),
+                info.destination()))
+            fatal = True
+
     random.shuffle(new_tasks)
 
     ordered_tasks = new_tasks + ordered_tasks
@@ -102,16 +134,40 @@ def read_new_tasks():
 def write_task_lists(partitions):
     jobs = ordered_tasks[:]
 
+    skipped = False
+
+    if interactive:
+        ix = 0
+        while ix < len(jobs):
+            job	= jobs[ix]
+            if job.lysrdiffpart() in partitions and not job.active():
+                print "Skipping:", job.task_desc()
+                for other in tasks_per_destination[job.destination()]:
+                    if other != job and other.active():
+                        print "     new:", other.task_desc()
+                raw_input("[confirm]")
+                del jobs[ix]
+                tasks_per_destination[job.destination()].remove(job)
+                del tasks_per_source[job.source()]
+                skipped = True
+            else:
+                ix += 1
+
     # Pick a few lucky jobs and move them to the front of the queue.
     # This way, we get roughly the same order as on the previous
     # backup (which is good because each job will then be backuped up
     # with approximately the same interval) but no job is (on average)
     # favoured over any other job.
-    for x in range(1 + int(0.005 * len(jobs))):
-	lucky_ix = random.randrange(0, len(jobs))
-	jobs = [jobs[lucky_ix]] + jobs[:lucky_ix] + jobs[lucky_ix+1:]
+    #
+    # Don't do this if we are running interactively and removing a
+    # job, since that would make the diffs harder to read.
+    if not interactive:
+        for x in range(1 + int(0.005 * len(jobs))):
+            lucky_ix = random.randrange(0, len(jobs))
+            jobs = [jobs[lucky_ix]] + jobs[:lucky_ix] + jobs[lucky_ix+1:]
 
     files = {}
+
     for job in jobs:
         part = job.lysrdiffpart()
         if part in partitions:
@@ -120,20 +176,32 @@ def write_task_lists(partitions):
                 os.mkdir(fn + ".lock")
                 files[part] = file(fn + ".new", "w")
             files[part].write(job.task_desc() + "\n")
+
     for lysrdiffpart, fp in files.items():
         fp.close()
         fn = tasklist_file(lysrdiffpart)
+        if skipped:
+            os.system("diff -u %s %s" % (fn, fn + ".new"))
+            raw_input("[confirm]")
+        
         os.rename(fn + ".new", fn)
         os.rmdir(fn + ".lock")
 
 def main():
     global fatal
+    global interactive
+
+    opts, args = getopt.getopt(sys.argv[1:], "-i", ["interactive"])
+    for opt, optval in opts:
+        if opt in ("-i", "--interactive"):
+            interactive = True
 
-    partitions = sets.Set([int(x) for x in sys.argv[1:]])
+    partitions = sets.Set([int(x) for x in args])
 
     for lysrdiffpart in range(2):
         read_tasks(lysrdiffpart)
-    read_new_tasks()
+    if interactive:
+        read_new_tasks()
     if not fatal:
         write_task_lists(partitions)
 
-- 
GitLab