import logging
from io import BufferedReader
import bigchaindb_driver as bd
import prov.graph as provgraph
import prov.model as provmodel
from bigchaindb_driver import pool as bdpool
from networkx import is_directed_acyclic_graph
from networkx import isolates
from networkx import topological_sort
from prov2bigchaindb.core import utils, local_stores, accounts
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
[docs]class BaseClient(object):
""" BigchainDB Base Client """
def __init__(self, host: str = '0.0.0.0', port: int = 9984,
num_connections: int = 5, local_store: local_stores.SqliteStore = local_stores.SqliteStore()):
"""
Instantiate Base Client object
:param host: BigchaindDB Hostname or IP (default: 0.0.0.0)
:type host: str
:param port: BigchaindDB Port (default: 9984)
:type port: int
:param num_connections: Amount of connections made to BigchainDB node
:type num_connections: int
:param local_store: Local database object
:type local_store: SqliteStore
"""
assert num_connections > 0
self.node = 'http://{}:{}'.format(host, str(port))
self.connections = num_connections * [bd.BigchainDB(self.node)]
self.connection_pool = bdpool.Pool(self.connections)
self.store = local_store
[docs] def test_transaction(self, tx: dict) -> bool:
"""
Validate a transaction against BigchainDB
:param tx: Transaction to test
:type tx: dict
:return: True or Exception
:rtype: bool
"""
reason = None
if not utils.is_valid_tx(tx['id'], self.connection_pool.get_connection()):
reason = "TX is invalid"
elif not utils.is_block_to_tx_valid(tx['id'], self.connection_pool.get_connection()):
reason = "Block is invalid"
if reason is None:
return True
log.error("Test failed: %s", tx['id'])
raise Exception(reason)
def _get_bigchain_connection(self) -> bd.BigchainDB:
"""
Returns BigchainDB connection
:return: BigchainDB connection object
:rtype: bd.BigchainDB
"""
return self.connection_pool.get_connection()
[docs] def save_document(self, document: object) -> object:
"""
Abstract method to store a document
:param document: Document to save
:type document: object
:return: id
:rtype: object
"""
raise NotImplementedError("Abstract method")
[docs] def get_document(self, document_id: object) -> provmodel.ProvDocument:
"""
Abstract method to retrieve a document
:param document_id: Document to save
:type document_id: object
:rtype: ProvDocument
"""
raise NotImplementedError("Abstract method")
[docs]class DocumentConceptClient(BaseClient):
""""""
def __init__(self, account_id: str = None, host: str = '0.0.0.0', port: int = 9984, num_connections: int = 1,
local_store: local_stores.SqliteStore = local_stores.SqliteStore()):
"""
Instantiate Document Client object
:param host: BigchaindDB Hostname or IP (default: 0.0.0.0)
:type host: str
:param port: BigchaindDB Port (default: 9984)
:type port: int
:param local_store: Local database object
:type local_store: SqliteStore
"""
super().__init__(host, port, num_connections, local_store)
self.account = accounts.DocumentConceptAccount(account_id, self.store)
[docs] def save_document(self, document: str or bytes or provmodel.ProvDocument) -> str:
"""
Write a document into BigchainDB
:param document: Document as JSON/XML/PROVN
:type document: str or bytes or ProvDocument
:return: Transaction id of document
:rtype: str
"""
log.info("Save document...")
prov_document = utils.to_prov_document(content=document)
asset = {'prov': prov_document.serialize(format='json')}
tx_id = self.account.save_asset(asset, self._get_bigchain_connection())
log.info("Saved document in Tx with id: %s", tx_id)
return tx_id
[docs] def get_document(self, tx_id: str) -> provmodel.ProvDocument:
"""
Retrieve a document by transaction id from BigchainDB
:param tx_id: Transaction Id of Document
:type tx_id: str
:return: Document as ProvDocument object
:rtype: ProvDocument
"""
log.info("Retrieve and build document")
tx = self._get_bigchain_connection().transactions.retrieve(tx_id)
self.test_transaction(tx)
if 'id' in tx['asset'].keys():
tx = self._get_bigchain_connection().transactions.get(asset_id=tx['asset']['id'])[0]
self.test_transaction(tx)
log.info("Success")
return utils.to_prov_document(tx['asset']['data']['prov'])
[docs]class GraphConceptClient(BaseClient):
""""""
def __init__(self, host: str = '0.0.0.0', port: int = 9984, num_connections: int = 5,
local_store: local_stores.SqliteStore = local_stores.SqliteStore()):
"""
Instantiate Graph Client object
:param host: BigchaindDB Hostname or IP (default: 0.0.0.0)
:type host: str
:param port: BigchaindDB Port (default: 9984)
:type port: int
:param local_store: Local database object
:type local_store: SqliteStore
"""
super().__init__(host, port, num_connections, local_store=local_store)
self.accounts = []
[docs] @staticmethod
def calculate_account_data(prov_document: provmodel.ProvDocument) -> list:
"""
Transforms a ProvDocument into a tuple with ProvElement, list of ProvRelation and list of Namespaces
:param prov_document: Document to transform
:type prov_document:
:return: List of tuples(element, relations, namespace)
:rtype: list
"""
namespaces = prov_document.get_registered_namespaces()
g = provgraph.prov_to_graph(prov_document=prov_document)
elements = []
for node, node_dict in g.adjacency():
relations = {'with_id': [], 'without_id': []}
# print(node)
for tmp_relations in node_dict.values():
for relation in tmp_relations.values():
relation = relation['relation']
if relation.identifier:
relations['with_id'].append(relation)
else:
relations['without_id'].append(relation)
elements.append((node, relations, namespaces))
return elements
[docs] def save_document(self, document: str or BufferedReader or provmodel.ProvDocument) -> list:
"""
Write a document into BigchainDB
:param document: Document as JSON/XML/PROVN
:type document: str or BufferedReader or ProvDocument
:return: List of transaction ids
:rtype: list
"""
log.info("Save document...")
document_tx_ids = []
prov_document = utils.to_prov_document(content=document)
elements = GraphConceptClient.calculate_account_data(prov_document)
id_mapping = {}
log.info("Create and Save instances")
for prov_element, prov_relations, namespaces in elements:
for rel in prov_relations['with_id']:
id_mapping[rel.identifier] = ''
for prov_element, prov_relations, namespaces in elements:
account = accounts.GraphConceptAccount(prov_element, prov_relations, id_mapping, namespaces, self.store)
self.accounts.append(account)
tx_id = account.save_instance_asset(self._get_bigchain_connection())
document_tx_ids.append(tx_id)
log.info("Save relations with ids")
for account in filter(lambda acc: acc.has_relations_with_id, self.accounts):
document_tx_ids += account.save_relations_with_ids(self._get_bigchain_connection())
log.info("Save relations without ids")
for account in filter(lambda acc: acc.has_relations_without_id, self.accounts):
document_tx_ids += account.save_relations_without_ids(self._get_bigchain_connection())
log.info("Saved document in %s Tx", len(document_tx_ids))
return document_tx_ids
[docs] def get_document(self, document_tx_ids: list) -> provmodel.ProvDocument:
"""
Retrieve a document by a list transaction ids from BigchainDB
:param document_tx_ids: Transaction Ids of Document
:type document_tx_ids: list
:return: Document as ProvDocument object
:rtype: ProvDocument
"""
log.info("Retrieve and rebuild document...")
doc = provmodel.ProvDocument()
for i in document_tx_ids:
log.info("tx id: %s",i)
tx = self._get_bigchain_connection().transactions.get(asset_id=i)[0]
self.test_transaction(tx)
if 'id' in tx['asset'].keys():
tx = self._get_bigchain_connection().transactions.get(asset_id=tx['asset']['id'])[0]
self.test_transaction(tx)
tmp_doc = utils.to_prov_document(tx['asset']['data']['prov'])
for namespace in tmp_doc.get_registered_namespaces():
doc.add_namespace(namespace)
for record in tmp_doc.get_records():
doc.add_record(record=record)
log.info("Success")
return doc
[docs]class RoleConceptClient(BaseClient):
""""""
def __init__(self, host: str = '0.0.0.0', port: int = 9984, num_connections: int = 5,
local_store: local_stores.SqliteStore = local_stores.SqliteStore()):
"""
Instantiate Role Client object
:param host: BigchaindDB Hostname or IP (default: 0.0.0.0)
:type host: str
:param port: BigchaindDB Port (default: 9984)
:type port: int
:param local_store: Local database object
:type local_store: SqliteStore
"""
super().__init__(host, port, num_connections, local_store=local_store)
self.accounts = []
[docs] @staticmethod
def calculate_account_data(prov_document: provmodel.ProvDocument) -> list:
"""
Transforms a ProvDocument into a list of tuples including:
ProvAgent, list of ProvRelations from agent,
list of ProvElements associated to ProvAgent,
list of Namespaces
:param prov_document: Document to transform
:type prov_document:
:return: List of tuples(ProvAgent, list(), list(), list())
:rtype: list
"""
namespaces = prov_document.get_registered_namespaces()
g = provgraph.prov_to_graph(prov_document=prov_document)
sorted_nodes = list(reversed(list(topological_sort(g))))
agents = list(filter(lambda elem: isinstance(elem, provmodel.ProvAgent), sorted_nodes))
elements = list(filter(lambda elem: not isinstance(elem, provmodel.ProvAgent), sorted_nodes))
# Check on compatibility
if not is_directed_acyclic_graph(g):
raise Exception("Provenance graph is not acyclic")
if list(isolates(g)):
raise Exception("Provenance not compatible with role-based concept. Has isolated Elements")
for element in elements:
if provmodel.ProvAgent not in [type(n) for n in g.neighbors(element)]:
raise Exception(
"Provenance not compatible with role-based concept. Element {} has not relation to any agent".format(
element))
accounts = []
for agent in agents:
# find out-going relations from agent
agent_relations = []
for u, v in g.out_edges(agent):
# Todo check if filter does not left out some info
agent_relations.append(g.get_edge_data(u, v)[0]['relation'])
agent_elements = {}
i = 0
for element in elements:
element_relations = []
if g.has_edge(element, agent):
for u, v in set(g.out_edges(element)):
for relation in g[u][v].values():
element_relations.append(relation['relation'])
agent_elements[i] = {element: element_relations}
i += 1
accounts.append((agent, agent_relations, agent_elements, namespaces))
return accounts
[docs] def save_document(self, document: str or BufferedReader or provmodel.ProvDocument) -> list:
"""
Write a document into BigchainDB
:param document: Document as JSON/XML/PROVN
:type document: str or BufferedReader or ProvDocument
:return: List of transaction ids
:rtype: list
"""
log.info("Save document...")
document_tx_ids = []
prov_document = utils.to_prov_document(content=document)
account_data = RoleConceptClient.calculate_account_data(prov_document)
id_mapping = {}
log.info("Create and Save instances")
for agent, relations, elements, namespaces in account_data:
account = accounts.RoleConceptAccount(agent, relations, elements, id_mapping, namespaces, self.store)
self.accounts.append(account)
tx_id = account.save_instance_asset(self._get_bigchain_connection())
document_tx_ids.append(tx_id)
log.info("Save elements")
for account in self.accounts:
document_tx_ids += account.save_elements(self._get_bigchain_connection())
log.info("Saved document in %s Tx", len(document_tx_ids))
return document_tx_ids
[docs] def get_document(self, document_tx_ids: list) -> provmodel.ProvDocument:
"""
Returns a document by a list transaction ids from BigchainDB
:param document_tx_ids: Transaction Ids of Document
:type document_tx_ids: list
:return: Document as ProvDocument object
:rtype: ProvDocument
"""
log.info("Retrieve and rebuild document...")
doc = provmodel.ProvDocument()
for i in document_tx_ids:
tx = self._get_bigchain_connection().transactions.get(asset_id=i)[0]
self.test_transaction(tx)
if 'id' in tx['asset'].keys():
tx = self._get_bigchain_connection().transactions.get(asset_id=tx['asset']['id'])[0]
self.test_transaction(tx)
tmp_doc = utils.to_prov_document(tx['asset']['data']['prov'])
for namespace in tmp_doc.get_registered_namespaces():
doc.add_namespace(namespace)
for record in tmp_doc.get_records():
doc.add_record(record=record)
log.info("Success")
return doc