From 2d039f7b8fadeb49f376f266e710c73e413c273b7dc57257376adace54a3a203 Mon Sep 17 00:00:00 2001 From: Moses Rolston Date: Sun, 9 Mar 2025 18:58:00 -0700 Subject: [PATCH] add endpoint process sponsors --- api/endpoints/process_sponsors.py | 121 ++++++++++-------------------- 1 file changed, 40 insertions(+), 81 deletions(-) diff --git a/api/endpoints/process_sponsors.py b/api/endpoints/process_sponsors.py index a8fe1b1..a8884da 100644 --- a/api/endpoints/process_sponsors.py +++ b/api/endpoints/process_sponsors.py @@ -1,95 +1,54 @@ -# endpoints/process_sponsors.py -from flask import Blueprint, jsonify -from app import get_driver, neo4j_logger -import json +from flask import Blueprint, jsonify, request import os +import csv +from neo4j import GraphDatabase +import logging -bp = Blueprint('process_sponsors', __name__) +bp = Blueprint('process_sponsored', __name__) -CACHE_FILE = 'cache.json' +# Custom logger for the process_sponsored blueprint +process_sponsored_logger = logging.getLogger('ProcessSponsoredLogger') +process_sponsored_logger.setLevel(logging.INFO) +process_sponsored_handler = logging.StreamHandler() +process_sponsored_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') +process_sponsored_handler.setFormatter(process_sponsored_formatter) +process_sponsored_logger.addHandler(process_sponsored_handler) -def load_cache(): - if os.path.exists(CACHE_FILE): - with open(CACHE_FILE, 'r') as f: - return json.load(f) - return {} +@bp.route('/process_sponsored', methods=['GET']) +def process_sponsored(): + csv_file_path = os.getenv("SPONSORED_LEGISLATION_CSV") -def save_cache(cache_data): - with open(CACHE_FILE, 'w') as f: - json.dump(cache_data, f) + if not csv_file_path: + return jsonify({"error": "SPONSORED_LEGISLATION_CSV environment variable is not set"}), 400 -@bp.route('/process_sponsors') -def process_sponsors(): - cache = load_cache() + try: + with open(csv_file_path, mode='r', newline='', encoding='utf-8') as file: + reader = csv.DictReader(file) + driver = GraphDatabase.driver(os.getenv("NEO4J_URI"), auth=(os.getenv("NEO4J_USER"), os.getenv("NEO4J_PASSWORD"))) + session = driver.session() - if 'legislation_entries' not in cache or len(cache['legislation_entries']) == 0: - return jsonify({"message": "No legislation entries found in cache"}), 404 + for row in reader: + properties = {key: value.strip() if isinstance(value, str) else value for key, value in row.items()} - # Print the number of items found in the cache initially - initial_legislation_entries_count = len(cache['legislation_entries']) - print(f"Initial legislation entries count: {initial_legislation_entries_count}") + # Log the CSV row + process_sponsored_logger.info(f"Processing row: {properties}") - processed_legislation_count = 0 + query = ( + "MERGE (l:legislation {" + + ", ".join(f"{key}: $props.{key}" for key in properties) + + "})" + ) - while 'legislation_entries' in cache and len(cache['legislation_entries']) > 0: - # Step 1: Retrieve a legislation entry from the cache - legislation_entry = cache['legislation_entries'].pop(0) + # Log the MERGE query + process_sponsored_logger.info(f"Executing query: {query}") - if not legislation_entry or 'bioguideId' not in legislation_entry: - continue + session.run(query, props=properties) - bioguideId = legislation_entry['bioguideId'] - legislation_properties = {key: value for key, value in legislation_entry.items() if key != 'bioguideId'} + session.close() + driver.close() - # Step 2: Create a legislation node with the properties - driver = get_driver() - with driver.session() as session: - legislation_id = legislation_properties['id'] - query = f"MATCH (n:Legislation {{id: $legislation_id}}) RETURN n" - neo4j_logger.info(f"Executing query: {query} with params: {{legislation_id: '{legislation_id}'}}") + return jsonify({"message": f"Processed {reader.line_num - 1} sponsored legislations"}), 200 - existing_legislation = session.run(query, legislation_id=legislation_id).single() - if not existing_legislation: - properties = ', '.join([f'{key}: "${key}"' for key in legislation_properties]) - query = f"CREATE (n:Legislation {{{properties}}}) RETURN n" - neo4j_logger.info(f"Executing query: {query} with data: {legislation_properties}") - result = session.run(query, **legislation_properties) - - # Convert the created node to a dictionary - new_legislation_node = { - 'id': result.single()['n'].id, - 'labels': list(result.single()['n'].labels), - **{key: value for key, value in result.single()['n'].items()} - } - neo4j_logger.info(f"Created legislation node: {new_legislation_node}") - - # Step 3: Create a relationship of type "sponsored" from the sponsor to the legislation - with driver.session() as session: - person_query = f"MATCH (a:Person {{bioguideId: $bioguideId}}) RETURN a" - neo4j_logger.info(f"Executing query: {person_query} with params: {{bioguideId: '{bioguideId}'}}") - - sponsor_node = session.run(person_query, bioguideId=bioguideId).single() - if not sponsor_node: - neo4j_logger.error(f"Person node does not exist for bioguideId {bioguideId}") - continue - - legislation_id = legislation_properties['id'] - relationship_query = f"MATCH (a:Person {{bioguideId: $bioguideId}}), (b:Legislation {{id: $legislation_id}}) CREATE (a)-[r:sponsored]->(b) RETURN r" - neo4j_logger.info(f"Executing query: {relationship_query} with params: {{bioguideId: '{bioguideId}', legislation_id: '{legislation_id}'}}") - - result = session.run(relationship_query, bioguideId=bioguideId, legislation_id=legislation_id) - relationship_node = { - 'id': result.single()['r'].id, - 'type': "sponsored", - **{key: value for key, value in result.single()['r'].items()} - } - neo4j_logger.info(f"Created sponsored relationship: {relationship_node}") - - processed_legislation_count += 1 - - save_cache(cache) - - # Print the total number of legislation items processed - print(f"Total processed legislation count: {processed_legislation_count}") - - return jsonify({"message": "Sponsorship processing completed successfully", "processed_legislation_count": processed_legislation_count}), 200 + except Exception as e: + process_sponsored_logger.error(f"Error processing sponsored legislation: {e}") + return jsonify({"error": str(e)}), 500