欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Apache Spark for Dealing with Ethereum Transactions Data

程序员文章站 2022-07-14 16:01:06
...

读文件

from pyspark.sql.types import *
from pyspark.sql.functions import col
from graphframes import *

file_path = "xi_3777501to3800000 block.txt"

txn_fields = [
    StructField("block_id", StringType(), True),
    StructField("time_stamp", StringType(), True),
    StructField("tx_hash", StringType(), True),
    StructField("method", StringType(), True),
    StructField("from_mac", StringType(), True),
    StructField("to_mac", StringType(), True),
    StructField("gas_limit", StringType(), True),
    StructField("value", StringType(), True),
    StructField("new_contract", StringType(), True),
    StructField("code", StringType(), True),
    StructField("is_succesful", StringType(), True)
]

txn = spark.read.csv(file_path, sep=",", header=False, schema=StructType(txn_fields))

txn.head()
txn.count()
txn.printSchema()

from_mac = txn.select('from_mac')
to_mac = txn.select('to_mac')

Transaction Raw data Example

3777501,1495909739,0xad9b464ef42fe9ed6eaec06e3fb31a845e88558d9bacb92f7dc24a655a5d6d29,Call,0x32Be343B94f860124dC4fEe278FDCBD38C102D88,0x71FA4943af0c6E4BE6dE30680290d8be3c816536,312333,0x297e9d28866b0000,,,OK

Transaction Dataframe Row Example

Row(block_id='3777501', time_stamp='1495909739', tx_hash='0xad9b464ef42fe9ed6eaec06e3fb31a845e88558d9bacb92f7dc24a655a5d6d29',  method='Call', from_mac='0x32Be343B94f860124dC4fEe278FDCBD38C102D88', to_mac='0x71FA4943af0c6E4BE6dE30680290d8be3c816536', gas_limit='312333',value='0x297e9d28866b0000', new_contract=None, code=None, is_succesful='OK')

合并,去重,保存

from functools import reduce
from pyspark.sql import DataFrame

def union_all(*dfs):
    return reduce(DataFrame.union, dfs)

from_mac = txn.select('from_mac').distinct() #187986
to_mac = txn.select('to_mac').distinct() #223223

all_mac = union_all(from_mac, to_mac).distinct()#244350

all_mac.coalesce(1).write.format("text").option("header", "false").mode("append").save("txn_data")

按某列排序

node_path = "./txn_data/nodes.txt"

n_fields = [
    StructField("mac_address", StringType(), True)
]

nodes = spark.read.csv(node_path, sep=",", header=False, schema=StructType(n_fields))

ordered_node = nodes.orderBy("mac_address")
相关标签: blockchain