欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

python pp 库实现并行计算

程序员文章站 2022-07-13 21:34:45
...

为什么 用pp 做并行计算
答:“简单”

"""
Created on Thu Oct 19 11:11:45 2017

@author: Administrator
"""

#-*- coding: UTF-8 -*-
import math, sys, time
import pp
def IsPrime(n):
    """返回n是否是素数"""
    if not isinstance(n, int):
        raise TypeError("argument passed to is_prime is not of 'int' type")
    if n < 2:
        return False
    if n == 2:""
        return True
    max = int(math.ceil(math.sqrt(n)))
    i = 2
    while i <= max:
        if n % i == 0:
            return False
        i += 1
    return True
def SumPrimes(n):
    for i in range(15):
        sum([x for x in range(2,n) if IsPrime(x)])
    """计算从2-n之间的所有素数之和"""
    return sum([x for x in range(2,n) if IsPrime(x)])
inputs = (100000, 100100, 100200, 100300, 100400, 100500, 100600, 100700)
'''
start_time = time.time()
for input in inputs:
    print ( SumPrimes(input))
print ('单线程执行,总耗时', time.time() - start_time, 's')
'''
# tuple of all parallel python servers to connect with
ppservers = ()
#ppservers = ("10.0.0.1",)
if len(sys.argv) > 1:
    ncpus = int(sys.argv[1])
    # Creates jobserver with ncpus workers
    job_server = pp.Server(ncpus, ppservers=ppservers)
else:
    # Creates jobserver with automatically detected number of workers
    job_server = pp.Server(ppservers=ppservers)
print ("pp 可以用的工作核心线程数", job_server.get_ncpus(), "workers")
start_time = time.time()
jobs = [(input, job_server.submit(SumPrimes,(input,), (IsPrime,), ("math",))) for input in inputs]

for input, job in jobs:
    (input,job())
    #print ("Sum of primes below", input, "is", job())
print ("多线程下执行耗时: ", time.time() - start_time, "s")

job_server.print_stats()
pp 可以用的工作核心线程数 4 workers
多线程下执行耗时:  23.168389320373535 s
Job execution statistics:
 job count | % of all jobs | job time sum | time per job | job server
         8 |        100.00 |      89.4352 |    11.179397 | local
Time elapsed since server creation 23.169389247894287
0 active tasks, 4 cores

我的工作中应用,需要多次调用函数save_to_mongo
主函数save_to_mongo需要调用的函数:

function=(common_rent_price,add_block,add_room,take_location,
some_district_information,neighbor,
most_frecuncy_rent,mean_rent_price,
no_source_price,block_price,
block_rule,RENT_no_data_mostblock_price,
region_district_list,add_rent_type,)

需要导入的库:
stock=(“os”,”collections”,”pymongo”,”numpy”,”pandas”,)
job:

jobs = [(input, job_server.submit(save_to_mongo,(ct,rl,input,20170501), function, stock)) for input in inputs]
运行方式:
for input, job in jobs:
(input,job())

import math, sys, time
import pp

inputs = plg
'''


start_time = time.time()
for input in inputs:
print ( SumPrimes(input))
print ('单线程执行,总耗时', time.time() - start_time, 's')
'''
# tuple of all parallel python servers to connect with
ppservers = ()
#ppservers = ("10.0.0.1",)
if len(sys.argv) > 1:
ncpus = int(sys.argv[1])
# Creates jobserver with ncpus workers
job_server = pp.Server(ncpus, ppservers=ppservers)
else:
# Creates jobserver with automatically detected number of workers
job_server = pp.Server(ppservers=ppservers)
print ("pp 可以用的工作核心线程数", job_server.get_ncpus(), "workers")
start_time = time.time()

<font color='grape' size=4.5>#主函数save_to_mongo需要调用的函数库
function=(common_rent_price,add_block,add_room,take_location,
         some_district_information,neighbor,
         most_frecuncy_rent,mean_rent_price,
         no_source_price,block_price,
         block_rule,RENT_no_data_mostblock_price,
         region_district_list,add_rent_type,)


stock=("os","collections","pymongo","numpy","pandas",)
jobs = [(input, job_server.submit(save_to_mongo,(ct,rl,input,20170501), function, stock)) for input in inputs]

for input, job in jobs:
    (input,job())
    print ("Sum of primes below", input, "is", job())
    #print('index=',disname.index(input))
print ("多线程下执行耗时: ", time.time() - start_time, "s")

job_server.print_stats()    

需要导入的库: 在导入库的时候,应该这样写如:
import numpy 在需要调用的函数中 这样写 numpy.array([xxx])
而不是这样:import numpy as np

相关标签: python 并行计算