[cpif] r12 - in trunk: . db-test

svn at argo.es svn at argo.es
Thu Apr 19 11:54:44 CEST 2007


Author: jcea
Date: Thu Apr 19 11:54:42 2007
New Revision: 12

Log:
Para ir abriendo boca...

Added:
   trunk/db-test/
   trunk/monitor.py   (contents, props changed)
   trunk/storage.py   (contents, props changed)
Modified:
   trunk/   (props changed)

Added: trunk/monitor.py
==============================================================================
--- (empty file)
+++ trunk/monitor.py	Thu Apr 19 11:54:42 2007
@@ -0,0 +1,50 @@
+# $Id$
+
+from __future__ import with_statement
+
+# Esta clase es un constructor de decoradores
+# utilizados para sincronizar el acceso al
+# sistema de persistencia.
+#
+# Serian innecesarios si cada "thread" tuviese
+# su conexion separada al sistema de persistencia,
+# pero eso supondria tener una cache independiente,
+# lo que consume memoria. Lo mas eficiente es tener
+# una unica cache, compartida con todo el mundo.
+# Pero en ese caso, hay que serializar los accesos.
+#
+# Cada monitor proporciona un acceso separado e
+# independiente, asi que podemos tener lo mejor
+# de ambos mundos.
+#
+# Esta clase, ademas, se encarga de gestionar los
+# conflictos automaticamente, para no tener que
+# preocuparnos de ello en el resto del programa.
+
+class monitor(object) :
+  def __init__(self,constructor) :
+    import threading
+    self.mutex=threading.Lock()
+    self.conn=constructor()
+
+  def __call__(self,func) :
+    def _monitor(*args, **kwargs) :
+      from durus.error import ConflictError
+      while True : # Reintenta si hay conflictos
+        with self.mutex :
+          try : # Nos aseguramos de liberar el lock
+            self.conn.abort() # Hacemos limpieza de cache
+            ret=func(*args, **kwargs)
+            self.conn.commit()
+            return ret
+          except ConflictError :
+            pass # El abort ya se hace en el bucle
+          except :
+            self.conn.abort()
+            import sys
+            import time
+            print >>sys.stderr,time.ctime()
+            raise
+
+    return _monitor
+

Added: trunk/storage.py
==============================================================================
--- (empty file)
+++ trunk/storage.py	Thu Apr 19 11:54:42 2007
@@ -0,0 +1,57 @@
+# $Id$
+
+import sys
+sys.path.append("./durus-berkeleydbstorage/")
+
+# Para poder acceder a los objetos desde el exterior
+# del sistema, por si las moscas, utilizamos un "storage"
+# de persistencia con acceso via unix socket
+
+def storage(path) :
+  import berkeleydb_storage
+  st=berkeleydb_storage.BerkeleyDBStorage(path,do_recover=True,durable=False,async_log_write=True)
+
+  def storage_background(storage,path) :
+    from durus.storage_server import StorageServer
+    try :
+      StorageServer(storage, address=path).serve()
+    except SystemExit :
+      storage.close()
+      import traceback
+      traceback.print_exc()
+      import os
+      os._exit(os.EX_OK)
+
+  path+="/unix_socket"
+  import threading
+  storage_thread=threading.Thread(target=storage_background,args=(st,path))
+  storage_thread.setDaemon(True)
+  storage_thread.start()
+
+  def conn_constructor() :
+    from durus.client_storage import ClientStorage
+    from durus.connection import Connection
+
+    conn= Connection(ClientStorage(address=path))
+    return conn
+
+  return storage_thread,conn_constructor
+
+def storage_test(path) :
+  import os
+  for i in os.listdir(path) :
+    if i[0]=="." : continue
+    os.remove(path+"/"+i)
+
+  return storage(path)
+
+def storage_and_monitor(path,storage=storage) :
+  thread,constructor=storage(path)
+  import monitor,time
+  while thread.isAlive() :
+    try :
+      m=monitor.monitor(constructor)
+      return thread,constructor,m
+    except :
+      time.sleep(0.1)
+



More information about the cpif mailing list