[cpif] r12 - in trunk: . db-test
svn at argo.es
svn at argo.es
Thu Apr 19 11:54:44 CEST 2007
Author: jcea
Date: Thu Apr 19 11:54:42 2007
New Revision: 12
Log:
Para ir abriendo boca...
Added:
trunk/db-test/
trunk/monitor.py (contents, props changed)
trunk/storage.py (contents, props changed)
Modified:
trunk/ (props changed)
Added: trunk/monitor.py
==============================================================================
--- (empty file)
+++ trunk/monitor.py Thu Apr 19 11:54:42 2007
@@ -0,0 +1,50 @@
+# $Id$
+
+from __future__ import with_statement
+
+# Esta clase es un constructor de decoradores
+# utilizados para sincronizar el acceso al
+# sistema de persistencia.
+#
+# Serian innecesarios si cada "thread" tuviese
+# su conexion separada al sistema de persistencia,
+# pero eso supondria tener una cache independiente,
+# lo que consume memoria. Lo mas eficiente es tener
+# una unica cache, compartida con todo el mundo.
+# Pero en ese caso, hay que serializar los accesos.
+#
+# Cada monitor proporciona un acceso separado e
+# independiente, asi que podemos tener lo mejor
+# de ambos mundos.
+#
+# Esta clase, ademas, se encarga de gestionar los
+# conflictos automaticamente, para no tener que
+# preocuparnos de ello en el resto del programa.
+
+class monitor(object) :
+ def __init__(self,constructor) :
+ import threading
+ self.mutex=threading.Lock()
+ self.conn=constructor()
+
+ def __call__(self,func) :
+ def _monitor(*args, **kwargs) :
+ from durus.error import ConflictError
+ while True : # Reintenta si hay conflictos
+ with self.mutex :
+ try : # Nos aseguramos de liberar el lock
+ self.conn.abort() # Hacemos limpieza de cache
+ ret=func(*args, **kwargs)
+ self.conn.commit()
+ return ret
+ except ConflictError :
+ pass # El abort ya se hace en el bucle
+ except :
+ self.conn.abort()
+ import sys
+ import time
+ print >>sys.stderr,time.ctime()
+ raise
+
+ return _monitor
+
Added: trunk/storage.py
==============================================================================
--- (empty file)
+++ trunk/storage.py Thu Apr 19 11:54:42 2007
@@ -0,0 +1,57 @@
+# $Id$
+
+import sys
+sys.path.append("./durus-berkeleydbstorage/")
+
+# Para poder acceder a los objetos desde el exterior
+# del sistema, por si las moscas, utilizamos un "storage"
+# de persistencia con acceso via unix socket
+
+def storage(path) :
+ import berkeleydb_storage
+ st=berkeleydb_storage.BerkeleyDBStorage(path,do_recover=True,durable=False,async_log_write=True)
+
+ def storage_background(storage,path) :
+ from durus.storage_server import StorageServer
+ try :
+ StorageServer(storage, address=path).serve()
+ except SystemExit :
+ storage.close()
+ import traceback
+ traceback.print_exc()
+ import os
+ os._exit(os.EX_OK)
+
+ path+="/unix_socket"
+ import threading
+ storage_thread=threading.Thread(target=storage_background,args=(st,path))
+ storage_thread.setDaemon(True)
+ storage_thread.start()
+
+ def conn_constructor() :
+ from durus.client_storage import ClientStorage
+ from durus.connection import Connection
+
+ conn= Connection(ClientStorage(address=path))
+ return conn
+
+ return storage_thread,conn_constructor
+
+def storage_test(path) :
+ import os
+ for i in os.listdir(path) :
+ if i[0]=="." : continue
+ os.remove(path+"/"+i)
+
+ return storage(path)
+
+def storage_and_monitor(path,storage=storage) :
+ thread,constructor=storage(path)
+ import monitor,time
+ while thread.isAlive() :
+ try :
+ m=monitor.monitor(constructor)
+ return thread,constructor,m
+ except :
+ time.sleep(0.1)
+
More information about the cpif
mailing list