必要的工具
- Neo4j Python驱动程序(撰写本文时为4.2版)
- jupiter notebook/Lab或谷歌Colab(可选)
- pandas
file = “./arxiv-metadata-oai-snapshot.json” metadata = []
lines = 100000 # 100k 测试
with open(file, ‘r’) as f:
for line in tqdm(f): metadata.append(json.loads(line)) lines -= 1 if lines == 0: break
df = pd.DataFrame(metadata)
id object submitter object authors object title object comments object journal-ref object doi object report-no object categories object license object abstract object versions object update_date object authors_parsed object
╒════════════════════════════════════╕ │”n” │ ╞════════════════════════════════════╡ │{“name”:[“Balázs”,”C.”,””]} │ ├────────────────────────────────────┤ │{“name”:[“Berger”,”E. L.”,””]} │ ├────────────────────────────────────┤ │{“name”:[“Nadolsky”,”P. M.”,””]} │ ├────────────────────────────────────┤ │{“name”:[“Yuan”,”C. -P.”,””]} │ ├────────────────────────────────────┤ │{“name”:[“Streinu”,”Ileana”,””]} │ └────────────────────────────────────┘
╒═══════════════════════════════════╕ │”c” │ ╞═══════════════════════════════════╡ │{“category”:”hep-ph”} │ ├───────────────────────────────────┤ │{“category”:”math.CO cs.CG”} │ ├───────────────────────────────────┤ │{“category”:”physics.gen-ph”} │ ├───────────────────────────────────┤ │{“category”:”math.CO”} │ ├───────────────────────────────────┤ │{“category”:”math.CA math.FA”} │ └───────────────────────────────────┘
def get_author_list(line): # 清除author dataframe列,在行中创建作者列表。 return [e[1] + ‘ ‘ + e[0] for e in line] def get_category_list(line): # 清除“category”列,在该行中创建类别列表。 return list(line.split(” “))
df[‘cleaned_authors_list’] = df[‘authors_parsed’].map(get_author_list) df[‘category_list’] = df[‘categories’].map(get_category_list) df = df.drop([‘submitter’, ‘authors’, ‘comments’, ‘journal-ref’, ‘doi’, ‘report-no’, ‘license’, ‘versions’, ‘update_date’, ‘abstract’, ‘authors_parsed’, ‘categories’], axis=1)
class Neo4jConnection: def __init__(self, uri, user, pwd): self.__uri = uri self.__user = user self.__pwd = pwd self.__driver = None try: self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd)) except Exception as e: print(“Failed to create the driver:”, e)
def close(self): if self.__driver is not None: self.__driver.close()
def query(self, query, parameters=None, db=None): assert self.__driver is not None, “Driver not initialized!” session = None response = None try: session = self.__driver.session(database=db) if db is not None else self.__driver.session() response = list(session.run(query, parameters)) except Exception as e: print(“Query failed:”, e) finally: if session is not None: session.close() return response
conn = Neo4jConnection(uri=”bolt://52.87.205.91:7687″, user=”neo4j”, pwd=”difficulties-pushup-gaps”)
def add_categories(categories): # 向Neo4j图中添加类别节点。 query = ”’ UNWIND $rows AS row MERGE (c:Category {category: row.category}) RETURN count(*) as total ”’ return conn.query(query, parameters = {‘rows’:categories.to_dict(‘records’)}) def add_authors(rows, batch_size=10000): # #以批处理作业的形式将作者节点添加到Neo4j图中。 query = ”’ UNWIND $rows AS row MERGE (:Author {name: row.author}) RETURN count(*) as total ”’ return insert_data(query, rows, batch_size)
def insert_data(query, rows, batch_size = 10000): # 以批处理方式更新Neo4j数据库。
total = 0 batch = 0 start = time.time() result = None
while batch * batch_size < len(rows):
res = conn.query(query, parameters= { ‘rows’: rows[batch*batch_sizebatch+1)*batch_size].to_dict(‘records’)}) total += res[0][‘total’] batch += 1 result = {“total”:total, “batches”:batch, “time”:time.time()-start} print(result)
return result
def add_papers(rows, batch_size=5000): # 添加论文节点 , (:Author)–(:Paper) , # (:Paper)–(:Category) 关系 query = ”’ UNWIND $rows as row MERGE (p:Paper {id:row.id}) ON CREATE SET p.title = row.title
// connect categories WITH row, p UNWIND row.category_list AS category_name MATCH (c:Category {category: category_name}) MERGE (p)-[:IN_CATEGORY]->(c)
// connect authors WITH distinct row, p // reduce cardinality UNWIND row.cleaned_authors_list AS author MATCH (a:Author {name: author}) MERGE (a)-[:AUTHORED]->(p) RETURN count(distinct p) as total ”’
return insert_data(query, rows, batch_size)
categories = pd.DataFrame(df[[‘category_list’]]) categories.rename(columns={‘category_list’:’category’}, inplace=True) categories = categories.explode(‘category’) \ .drop_duplicates(subset=[‘category’]) authors = pd.DataFrame(df[[‘cleaned_authors_list’]]) authors.rename(columns={‘cleaned_authors_list’:’author’}, inplace=True) authors=authors.explode(‘author’).drop_duplicates(subset=[‘author’])
add_categories(categories) add_authors(authors) add_papers(df)
MATCH (a:Author)-[:AUTHORED]->(p:Paper)-[:IN_CATEGORY]->(c:Category) RETURN a, p, c LIMIT 300
query_string = ”’ MATCH (c:Category) RETURN c.category_name, SIZE(()-[:IN_CATEGORY]->(c)) AS inDegree ORDER BY inDegree DESC LIMIT 20 ”’ top_cat_df = pd.DataFrame([dict(_) for _ in conn.query(query_string)]) top_cat_df.head(20)
result = conn.query(query_string) for record in result: print(record[‘c.category’], record[‘inDegree’])
神龙|纯净稳定代理IP免费测试>>>>>>>>天启|企业级代理IP免费测试>>>>>>>>IPIPGO|全球住宅代理IP免费测试