sparkpythonhbase(有没有Python写的spark连接Hbase的例子)

1.有没有Python写的spark连接Hbase的例子

博主项目实践中,经常需要用Spark从Hbase中读取数据。

其中,spark的版本为1.6,hbase的版本为0.98。现在记录一下如何在spark中操作读取hbase中的数据。

对于这种操作型的需求,没有什么比直接上代码更简单明了的了。so,show me the code!object Demo extends Logging{val CF_FOR_FAMILY_USER = Bytes.toBytes("U");val CF_FOR_FAMILY_DEVICE = Bytes.toBytes("D")val QF_FOR_MODEL = Bytes.toBytes("model")val HBASE_CLUSTER = "hbase://xxx/"val TABLE_NAME = "xxx";val HBASE_TABLE = HBASE_CLUSTER + TABLE_NAMEdef genData(sc:SparkContext) = {//20161229的数据,rowkey的设计为9999-yyyyMMddval filter_of_1229 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("79838770"))//得到qf为w:00-23的数据val filter_of_qf = new QualifierFilter(CompareFilter.CompareOp.EQUAL,new SubstringComparator("w"))val all_filters = new util.ArrayList[Filter]()all_filters.add(filter_of_1229)all_filters.add(filter_of_qf)//hbase多个过滤器val filterList = new FilterList(all_filters)val scan = new Scan().addFamily(CF_FOR_FAMILY_USER)scan.setFilter(filterList)scan.setCaching(1000)scan.setCacheBlocks(false)val conf = HBaseConfiguration.create()conf.set(TableInputFormat.INPUT_TABLE,HBASE_TABLE )conf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()))sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[],classOf[Result])//后面是针对hbase查询结果的具体业务逻辑.map()。

def main(args: Array[String]): Unit = {val Array(output_path) = argsval sparkConf = new SparkConf().setAppName("demo")sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")val sc = new SparkContext(sparkConf)genUuidWifi(sc).saveAsTextFile(output_path)sc.stop()}}需要注意的一个小点就是如果hbase里有多个过滤器,注意需要使用FilterList。

2.spark 读 hbase parquet 哪个快

spark读hbase,生成task受所查询table的region个数限制,任务数有限,例如查询的40G数据,10G一个region,很可能就4~6个region,初始的task数就只有4~6个左右,RDD后续可以partition设置task数;

spark读parquet按默认的bolck个数生成task个数,例如128M一个bolck,差不多就是300多个task,初始载入情况就比hbase快,而且直接载入parquet文件到spark的内存,而hbase还需要同regionserver交互把数据传到spark的内存也是需要消耗时间的。

总体来说,读parquet更快

sparkpythonhbase

3.如何在Python中访问HBase的数据

python连接hbase时需要先加载thrift和hbase的相关包,之后创建与hbase的连接并进行后续操作,具体代码如下:

# -*- coding: utf-8 -*-

import sys

reload(sys)

sys.setdefaultencoding('utf-8')

from thrift.transport.tsocket import tsocket

from thrift.transport.ttransport import tbufferedtransport

from thrift.protocol import tbinaryprotocol

from hbase import hbase

from hbase.ttypes import *

import pymongo

import hashlib

import time

from datetime import datetime

class hbaseoperator():

def __init__(self):

self.host = "ip_address"

self.port = 9090

self.transport = tbufferedtransport(tsocket(self.host, self.port))

self.transport.open()

self.protocol = tbinaryprotocol.tbinaryprotocol(self.transport)

self.client = hbase.client(self.protocol)

def __del__(self):

self.transport.close()

def getalltablesinfo(self):

#get table info

listtables = self.client.gettablenames()

print "="*40

print "show all tables information。."

for tablename in listtables:

print "tablename:" + tablename

print " "

listcolumns = self.client.getcolumndescriptors(tablename)

print listcolumns

print " "

listtableregions = self.client.gettableregions(tablename)

print listtableregions

print "+"*40

4.如何使用Spark/Scala读取Hbase的数据

必须使用高亮参数启动Spark-shell,否则当你遍历RDD时会出现如下的Exception

java.io.: org.apache.hadoop.hbase.io.

spark-shell--conf spark.serializer=org.apache.spark.serializer.KryoSerializer

以下代码,经过MaprDB实测通过

import org.apache.spark._

import org.apache.spark.rdd.NewHadoopRDD

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}

import org.apache.hadoop.hbase.client.HBaseAdmin

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hbase.HColumnDescriptor

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.HTable;

val tableName = "/app//TRANSAC_ID"

val conf = HBaseConfiguration.create()

conf.set(TableInputFormat.INPUT_TABLE, tableName)

//create rdd

val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],

classOf[org.apache.hadoop.hbase.io.],

classOf[org.apache.hadoop.hbase.client.Result])

hBaseRDD.take(2).map(row=>row._2.rawCells).

map(_.map( kv => (new String(kv.getQualifier()) -> new

String(kv.getValue()) ) ).toMap ). foreach( map => { map.foreach{

entry => print(entry._1 +":" + entry._2 + ", ") } ;

print("\n-----------\n") } )

//get the row count

val count = hBaseRDD.count()

print("HBase RDD count:"+count)

5.如何使用Spark/Scala读取Hbase的数据

如何使用Spark/Scala读取Hbase的数据

必须使用高亮参数启动Spark-shell,否则当你遍历RDD时会出现如下的Exception

java.io.: org.apache.hadoop.hbase.io.

spark-shell--conf spark.serializer=org.apache.spark.serializer.KryoSerializer

以下代码,经过MaprDB实测通过

import org.apache.spark._

import org.apache.spark.rdd.NewHadoopRDD

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}

import org.apache.hadoop.hbase.client.HBaseAdmin

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hbase.HColumnDescriptor

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.HTable;

val tableName = "/app//TRANSAC_ID"

val conf = HBaseConfiguration.create()

conf.set(TableInputFormat.INPUT_TABLE, tableName)

//create rdd

val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],

classOf[org.apache.hadoop.hbase.io.],

classOf[org.apache.hadoop.hbase.client.Result])

hBaseRDD.take(2).map(row=>row._2.rawCells).

map(_.map( kv => (new String(kv.getQualifier()) -> new

String(kv.getValue()) ) ).toMap ). foreach( map => { map.foreach{

entry => print(entry._1 +":" + entry._2 + ", ") } ;

print("\n-----------\n") } )

//get the row count

val count = hBaseRDD.count()

print("HBase RDD count:"+count)

6.spark 怎么实现对hbase分布式计算

由于spark提供的hbaseTest是scala版本,并没有提供java版。

我将scala版本改为java版本,并根据数据做了些计算操作。程序目的:查询出hbase满足条件的用户,统计各个等级个数。

代码如下,西面使用的hbase是0.94注释已经写详细:package com.sdyc.ndspark.sys;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.;import org.apache.hadoop.hbase.mapreduce.TableInputFormat;import org.apache.hadoop.hbase.util.Base64;import org.apache.hadoop.hbase.util.Bytes;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.io.ByteArrayOutputStream;import java.io.DataOutputStream;import java.io.IOException;import java.io.Serializable;import java.util.List;/** * * * spark hbase 测试 * * Created with IntelliJ IDEA. * User: zhangdonghao * Date: 14-1-26 * Time: 上午9:24 * To change this template use File | Settings | File Templates. * * * @author zhangdonghao */public class HbaseTest implements Serializable { public Log log = LogFactory.getLog(HbaseTest.class); /** * 将scan编码,该方法copy自 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil * * @param scan * @return * @throws IOException */ static String convertScanToString(Scan scan) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(out); scan.write(dos); return Base64.encodeBytes(out.toByteArray()); } public void start() { //初始化sparkContext,这里必须在jars参数里面放上Hbase的jar, // 否则会报unread block data异常 JavaSparkContext sc = new JavaSparkContext("spark://nowledgedata-n3:7077", "hbaseTest", "/home/hadoop/software/spark-0.8.1", new String[]{"target/ndspark.jar", "target\\dependency\\hbase-0.94.6.jar"}); //使用HBaseConfiguration.create()生成Configuration // 必须在项目classpath下放上hadoop以及hbase的配置文件。 Configuration conf = HBaseConfiguration.create(); //设置查询条件,这里值返回用户的等级 Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes("195861-1035177490")); scan.setStopRow(Bytes.toBytes("195861-1072173147")); scan.addFamily(Bytes.toBytes("info")); scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("levelCode")); try { //需要读取的hbase表名 String tableName = "usertable"; conf.set(TableInputFormat.INPUT_TABLE, tableName); conf.set(TableInputFormat.SCAN, convertScanToString(scan)); //获得hbase查询结果Result JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(conf, TableInputFormat.class, .class, Result.class); //从result中取出用户的等级,并且每一个算一次 JavaPairRDD levels = hBaseRDD.map( new PairFunction, Integer, Integer>() { @Override public Tuple2 call( Tuple2 ) throws Exception { byte[] o = ._2().getValue( Bytes.toBytes("info"), Bytes.toBytes("levelCode")); if (o != null) { return new Tuple2(Bytes.toInt(o), 1); } return null; } }); //数据累加 JavaPairRDD counts = levels.reduceByKey(new Function2() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); //打印出最终结果 List> output = counts.collect(); for (Tuple2 tuple : output) { System.out.println(tuple._1 + ": " + tuple._2); } } catch (Exception e) { log.warn(e); } } /** * spark如果计算没写在main里面,实现的类必须继承Serializable接口, * >否则会报 Task not serializable: java.io. 异常 */ public static void main(String[] args) throws InterruptedException { new HbaseTest().start(); System.exit(0); }}注意:如果使用的是hbase0.96.1.1-函数需要改为:/** * 将scan编码,该方法copy自 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil * * @param scan * @return * @throws IOException */static String convertScanToString(Scan scan) throws IOException { ClientProtos.Scan proto = ProtobufUtil.toScan(scan); return Base64.encodeBytes(proto.toByteArray());}运行结果如下:0: 2852811: 7084: 286562: 363156: 238488: 1980210: 69139: 159883: 319501: 388727: 216005: 2719012: 17。

7.spark 处理的数据存入hbase无反应

调用parallelize函数直接从集合中获取数据,并存入RDD中;Java版本如下: 1 JavaRDD myRDD = sc.parallelize(Arrays.asList(1,2,3)); Scala版本如下: 1 val myRDD= sc.parallelize(List(1,2,3)) 这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初始化值;更常见的是(2)、从文本中读取数据到RDD中,这个文本可以是纯文本文件、可以是sequence文件;可以存放在本地(file://)、可以存放在HDFS(hdfs://)上,还可以存放在S3上。

其实对文件来说,Spark支持Hadoop所支持的所有文件类型和文件存放位置。

8.spark

Configurationconf=HBaseConfiguration.create(); StringtableName="testTable"; Scanscan=newScan(); scan.setCaching(10000); scan.setCacheBlocks(false); conf.set(TableInputFormat.INPUT_TABLE,tableName); ClientProtos.Scanproto=ProtobufUtil.toScan(scan); StringScanToString=Base64.encodeBytes(proto.toByteArray()); conf.set(TableInputFormat.SCAN,ScanToString); JavaPairRDDmyRDD=sc .newAPIHadoopRDD(conf,TableInputFormat.class, .class,Result.class); 在Spark使用如上Hadoop提供的标准接口读取HBase表数据(全表读),读取5亿左右数据,要20M+,而同样的数据保存在Hive中,读取却只需要1M以内,性能差别非常大。

转载,仅供参考。

sparkpythonhbase

转载请注明出处代码入门网 » sparkpythonhbase(有没有Python写的spark连接Hbase的例子)

资讯

python文字编码转换(python如何将乱码转成汉字)

阅读(14)

本文主要为您介绍python文字编码转换,内容包括python字符编码转换,python如何将乱码转成汉字,python编码转换。python2与python3稍微有点区别2、python2中默认的字符编码格式都是unicode,在字符串前加u,表示unicode

资讯

python相除取整数(python为什么算除法自动取整了)

阅读(22)

本文主要为您介绍python相除取整数,内容包括python2.7.12中如何引用模块做整数相除,python2.7.12引用模块做整数除法,python除数向上取整怎么写,就是0.3取12.6取3这种,我5/10得到。在2.x版本都是这样,会自动取整。在3.x版本以上改了,结果保留

资讯

pythoncondapip(pip和conda到底有什么不一样)

阅读(16)

本文主要为您介绍pythoncondapip,内容包括pip和conda到底有什么不一样,pip和conda到底有什么不一样,pythonpip是什么。在python的世界里也浸淫多年了,我们早已习惯有 pip ,easy_install 和virtualenv的世界,但是这些

资讯

python绘制波形图(matplotlib怎样绘制波形图)

阅读(18)

本文主要为您介绍python绘制波形图,内容包括matplotlib怎样绘制波形图,matplotlib怎样绘制波形图,python问题求解我现在通过编程生成一个波形文件,怎么获取第一秒之。可以选择ActivePython,然后用命令 pypm -g install matplotlib安装模块。

资讯

ugpython(python和activepython)

阅读(12)

本文主要为您介绍ugpython,内容包括Python、Ruby、Java、C++、UG有法语、德语、俄语版本的吗?,pythongraphtool怎么使用,python和activepython。activepython是红帽子的那家公司自己定制的。 python通常是指c-python,就是说用C语言为主写的

资讯

阿里云python3(阿里云虚拟主机可以部署python代码吗)

阅读(34)

本文主要为您介绍阿里云python3,内容包括阿里云虚拟主机可以部署python代码吗,如何在“阿里云+Wdcp”上面搭建Python环境,各位大神,Python怎么调用阿里云API。一 正确的打开姿势1.按win+r然后输入cmd2.切换到程序所在的目录3.输入python 程

资讯

pythonraise用法(python中可以使用if作为变量名吗)

阅读(20)

本文主要为您介绍pythonraise用法,内容包括python中可以使用if作为变量名吗,Python新手,Python中except的用法和作用是什么?请指教,python当中的迭代器如何使用?举例说明,谢谢。不可以,if为python保留的字符下面的列表显示了在 Python 中的

资讯

python变量替换(python中的变量替换怎么使用)

阅读(17)

本文主要为您介绍python变量替换,内容包括python中的变量替换怎么使用,Python怎样用变量替换字符串?,怎样通过参数替换python脚本里的变量值。1. 使用连接符: +world = "World"print "Hello " + world + " ! "2. 使用占位符来内插w

资讯

pythonsslurllib(python报错urllib.error.URLError:)

阅读(16)

本文主要为您介绍pythonsslurllib,内容包括pythonurllib怎么用,pythonurllib,pythonhttplib2urllib区别。jango站点使用django_cas接入SSO(单点登录系统),配置完成后登录,抛出“urlopen error unknown ur

资讯

python新建对象(python中怎么用类创建对象)

阅读(20)

本文主要为您介绍python新建对象,内容包括python中怎么用类创建对象,python如何定义新对象,python如何定义新对象。class BaseObject(object):def __init__(self, **kwg):self.__dict__ = d

资讯

win32processpython(怎样使用python查询系统某一进程是否存在)

阅读(20)

本文主要为您介绍win32processpython,内容包括pythonwin32是什么?pythoncom如何理解,怎样使用python查询系统某一进程是否存在,怎样使用python查询系统某一进程是否存在。只需要一小段python代码,就可以解决用python查询判断系统进程是否存

资讯

pythontwisted广播(局域网如何互连?)

阅读(1)

本文主要为您介绍pythontwisted广播,内容包括pythonPyTorch用matmul写矩阵相乘函数时,怎么让函数可以广播?,局域网如何互连?,学习python能干什么???。网络互连的目的:是将多个网络互相连接,以实现在更大范围内的信息交换资源共享和协同工作。

资讯

python精要参考pudn(python四种基本数据类型)

阅读(1)

本文主要为您介绍python精要参考pudn,内容包括python四种基本数据类型,要学习python需要学习哪些知识?,零基础如何开始学习Python?如何入门?。python是相当高级的语言,基本数据类型就很多,不过参考其他语言的,一定要四个的话:整形 int定义:age=1

资讯

python百分数加减(python如何进行多项式的加减乘除)

阅读(1)

本文主要为您介绍python百分数加减,内容包括python中,如何计算变量与百分数的结果?,python如何进行多项式的加减乘除,扩句(有要求)将下面的句子扩展成一句话,。如何进行多项式的加减乘除?这个题目太大!12个字,写出来很简单,答案却是初中教材上

资讯

wsgipython开发(如何创建部署WSGI类型的Python应用)

阅读(1)

本文主要为您介绍wsgipython开发,内容包括如何创建部署WSGI类型的Python应用,如何创建部署WSGI类型的Python应用,如何部署pythonweb程序。第一部分:安装必要工具。1.因为这是部署Python开发环境,所以安装pip可以简化一些软件的安装过程。(PIP

资讯

python集成测试框架(最受欢迎的Python开源框架有哪些)

阅读(1)

本文主要为您介绍python集成测试框架,内容包括最受欢迎的Python开源框架,企业级软件开发需要什么样的框架,Python企业应用到底如何?。Django: Python Web应用开发框架Django 应该是最出名的Python框架,GAE甚至Erlang都有框架受它影响。D

资讯

python数据驱动测试(python中什么是测试数据和训练数据)

阅读(1)

本文主要为您介绍python数据驱动测试,内容包括python中什么是测试数据和训练数据,python学到什么程度就能满足测试需要,如何用python抓取js生成的数据:。当数据量特别大的时候,有几千几万条,为了验证模型的好坏,取出一部分用于训练,另一部分用

资讯

python函数返回值为list(返回值是一个list的函数,该怎么写)

阅读(1)

本文主要为您介绍python函数返回值为list,内容包括python设计一个名为listsum的函数,接收一个数字的列表,返回积累,python函数如何返回多个变量,c/c++调用python中返回值为list的函数。1.你先定义个实体类(当中的字段要跟table里一一对应,类

资讯

pythondatetime相加(python中datetime怎么用)

阅读(1)

本文主要为您介绍pythondatetime相加,内容包括python时间相加函数,python中datetime怎么用,python获取系统时间有问题(时区?)写了一段python代码。 日期相关的操作 from datetime import datetime from datetime import timedel