add sponsored and cosponsored endpoints

This commit is contained in:
Moses Rolston 2025-03-09 16:41:33 -07:00
parent 084ddc26d3
commit 33dd47226b
2 changed files with 169 additions and 115 deletions

View File

@ -0,0 +1,99 @@
from flask import Blueprint, jsonify, request
import requests
import csv
import os
import logging
import json
bp = Blueprint('get_cosponsored', __name__)
# Function to retrieve bioguideIds from cache.json file
def get_bioguideIds_from_cache():
CACHE_PATH = os.getenv("CACHE_PATH")
if not CACHE_PATH:
logging.error("CACHE_PATH not found in .env file")
return None
if not os.path.exists(CACHE_PATH):
logging.error("Cache file not found at specified path")
return None
with open(CACHE_PATH, 'r') as file:
cache_data = json.load(file)
bioguideIds = cache_data.get("bioguideIds", [])
if not bioguideIds:
logging.error("bioguideIds not found in cache.json")
return None
return bioguideIds
except Exception as e:
logging.error(f"Failed to read cache file: {str(e)}")
return None
# Function to flatten nested dictionaries and lists
def flatten_dict(d, parent_key='', sep='_'):
items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep=sep).items())
elif isinstance(v, list):
for i, item in enumerate(v):
items.append((f"{new_key}_{i}", flatten_dict(item, '', sep=sep)))
items.append((new_key, v if v is not None and v != "" else "NONE"))
return dict(items)
# Function to write data to CSV
def write_to_csv(data, filename):
keys = set()
for item in data:
with open(filename, 'w', newline='') as output_file:
dict_writer = csv.DictWriter(output_file, fieldnames=keys)
@bp.route('/get_cosponsored', methods=['GET'])
def get_cosponsored_legislation():
# Retrieve bioguideIds from cache.json
bioguideIds = get_bioguideIds_from_cache()
if not bioguideIds:
return jsonify({"error": "bioguideIds not found"}), 404
all_data = []
for bioguideId in bioguideIds:
# Make request to Congress API
api_key = os.getenv("CONGRESS_API_KEY")
url = f"{bioguideId}/cosponsored-legislation?api_key={api_key}"
response = requests.get(url)
if response.status_code != 200:
logging.error(f"Failed to retrieve cosponsored legislation for bioguideId {bioguideId}: {response.text}")
return jsonify({"error": f"Failed to retrieve data from Congress API for bioguideId {bioguideId}"}), 500
data = response.json().get("cosponsoredLegislation", [])
# Add cosponsored_by column and handle nested items
for item in data:
flattened_item = flatten_dict(item)
flattened_item["cosponsored_by"] = bioguideId
if not any(flattened_item.values()):
continue # Skip empty rows
if not all_data:
return jsonify({"error": "No cosponsored legislation found for the given bioguideIds"}), 404
# Write data to CSV
csv_filename = f"cosponsored_legislation.csv"
write_to_csv(all_data, csv_filename)
return jsonify({"message": "Data written to CSV successfully", "filename": csv_filename})
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
return jsonify({"error": str(e)}), 500

View File

@ -1,144 +1,99 @@
from flask import Blueprint, jsonify
from flask import Blueprint, jsonify, request
import requests
import json
import csv
import os
import pandas as pd
# Assuming you have these functions and configurations in your
from app import get_driver, neo4j_logger
import logging
import json
bp = Blueprint('get_sponsored', __name__)
CACHE_FILE = 'cache.json'
CSV_FILE = 'legislation.csv'
def load_cache():
if os.path.exists(CACHE_FILE):
with open(CACHE_FILE, 'r') as f:
return json.load(f)
else:"Cache file {CACHE_FILE} does not exist. Initializing empty cache.")
return {}
# Function to retrieve bioguideIds from cache.json file
def get_bioguideIds_from_cache():
CACHE_PATH = os.getenv("CACHE_PATH")
if not CACHE_PATH:
logging.error("CACHE_PATH not found in .env file")
return None
def save_cache(cache_data):
with open(CACHE_FILE, 'w') as f:
json.dump(cache_data, f)"Saved data to cache file: {CACHE_FILE}")
if not os.path.exists(CACHE_PATH):
logging.error("Cache file not found at specified path")
return None
def write_to_csv(legislation_data, csv_file):
# Flatten the nested dictionaries
flattened_legislation = []
for item in legislation_data:
flattened_item = {}
flatten_dict(item, "", flattened_item)
df = pd.DataFrame(flattened_legislation)
# Debugging: Print the first few entries of the DataFrame to inspect its structure
print("Debugging DataFrame:")
if not df.empty:
print("DataFrame is empty.")
if df.empty:
neo4j_logger.warning(f"No data to write to CSV file: {csv_file}")
print("DataFrame is empty. Debugging information:")
for item in flattened_legislation[:5]: # Print first 5 items
print(json.dumps(item, indent=4))
df.to_csv(csv_file, index=False)"Data written to CSV file: {csv_file}")
with open(CACHE_PATH, 'r') as file:
cache_data = json.load(file)
bioguideIds = cache_data.get("bioguideIds", [])
if not bioguideIds:
logging.error("bioguideIds not found in cache.json")
return None
return bioguideIds
except Exception as e:
logging.error(f"Failed to read cache file: {str(e)}")
return None
# Function to flatten nested dictionaries and lists
def flatten_dict(d, parent_key='', sep='_'):
items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
# Debugging: Print the key and value when entering a nested dictionary
print(f"Entering nested dictionary with key: {new_key}")
items.extend(flatten_dict(v, new_key, sep=sep).items())
elif isinstance(v, list):
# Handle lists by converting them to strings or other appropriate representation
if v:
items.append((new_key, ', '.join(map(str, v))))
for i, item in enumerate(v):
items.append((f"{new_key}_{i}", flatten_dict(item, '', sep=sep)))
# If the list is empty, add an empty string or a placeholder
items.append((new_key, ''))
elif v is not None:
# Debugging: Print the key and value when adding a non-dict, non-list item
print(f"Adding {new_key}: {v}")
items.append((new_key, v))
# Handle None values appropriately (e.g., add an empty string or a placeholder)
items.append((new_key, ''))
items.append((new_key, v if v is not None and v != "" else "NONE"))
return dict(items)
def get_sponsored():
# Load bioguideIds and legislation from the same cache
cache = load_cache()
if 'bioguideIds' not in cache or len(cache['bioguideIds']) == 0:
return jsonify({"message": "No bioguideIds found in cache"}), 404
# Function to write data to CSV
def write_to_csv(data, filename):
keys = set()
for item in data:
# Print the number of items found in the cache initially
initial_bioguideIds_count = len(cache['bioguideIds'])
print(f"Initial bioguideIds count: {initial_bioguideIds_count}")
with open(filename, 'w', newline='') as output_file:
dict_writer = csv.DictWriter(output_file, fieldnames=keys)
processed_legislation_count = 0
all_legislation_data = []
@bp.route('/get_sponsored', methods=['GET'])
def get_sponsored_legislation():
# Retrieve bioguideIds from cache.json
bioguideIds = get_bioguideIds_from_cache()
if not bioguideIds:
return jsonify({"error": "bioguideIds not found"}), 404
while 'bioguideIds' in cache and len(cache['bioguideIds']) > 0:
# Step 1: Retrieve a sponsor from the cache
current_bioguideId = cache['bioguideIds'].pop(0)
if current_bioguideId is None:
all_data = []
print(f"Processing bioguideId: {current_bioguideId}")
congress_api_url = f"{current_bioguideId}/sponsored-legislation"
# Include API key in headers (if required)
api_key = os.getenv('CONGRESS_API_KEY')
if not api_key:
neo4j_logger.error("Congress API key not found in environment variables")
headers = {
'X-API-KEY': api_key
# Step 2: Fetch sponsored legislation for the member
response = requests.get(congress_api_url, headers=headers)
print(f"Response Status Code: {response.status_code}")
print(f"Response Text: {response.text}")
for bioguideId in bioguideIds:
# Make request to Congress API
api_key = os.getenv("CONGRESS_API_KEY")
url = f"{bioguideId}/sponsored-legislation?api_key={api_key}"
response = requests.get(url)
if response.status_code != 200:
neo4j_logger.error(f"Failed to fetch sponsored legislation for bioguideId {current_bioguideId}: Status Code {response.status_code}, Response: {response.text}")
logging.error(f"Failed to retrieve sponsored legislation for bioguideId {bioguideId}: {response.text}")
return jsonify({"error": f"Failed to retrieve data from Congress API for bioguideId {bioguideId}"}), 500
response_data = response.json()
data = response.json().get("sponsoredLegislation", [])
# Debugging statement to check the raw API response
print("Raw API Response:")
print(json.dumps(response_data, indent=4))
# Add sponsored_by column and handle nested items
for item in data:
flattened_item = flatten_dict(item)
flattened_item["sponsored_by"] = bioguideId
if not any(flattened_item.values()):
continue # Skip empty rows
# Extract legislation data from the response
if 'sponsoredLegislation' in response_data and len(response_data['sponsoredLegislation']) > 0:
for result in response_data['sponsoredLegislation']:
if not all_data:
return jsonify({"error": "No sponsored legislation found for the given bioguideIds"}), 404
# Debugging statement to check the number of legislation items collected
print(f"Number of legislation items collected: {len(all_legislation_data)}")
# Write data to CSV
csv_filename = f"sponsored_legislation.csv"
write_to_csv(all_data, csv_filename)
if len(all_legislation_data) > 0:
# Print first few items to ensure data is structured correctly
for i, item in enumerate(all_legislation_data[:5]):
print(f"\nLegislation Item {i+1}:")
print(json.dumps(item, indent=4))
return jsonify({"message": "Data written to CSV successfully", "filename": csv_filename})
# Write the extracted legislation data to a CSV file
write_to_csv(all_legislation_data, CSV_FILE)
return jsonify({"message": "Legislation data written to CSV successfully"}), 200
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
return jsonify({"error": str(e)}), 500