欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

抽取oracle数据到mysql数据库的实现过程

程序员文章站 2022-06-22 09:13:48
在oracle数据库迁移至mysql数据库,除了oracle数据库模型移到mysql外,还一个重要环节就是要将oracle数据库的数据移到mysql数据库,本人尝试用过多款...

在oracle数据库迁移至mysql数据库,除了oracle数据库模型移到mysql外,还一个重要环节就是要将oracle数据库的数据移到mysql数据库,本人尝试用过多款数据迁移程序,性能都不是很好的,于是自己动手写一个针对于oracle数据库数据迁移到mysql数据程序,其具体过程如下:

1、要抽取mysql表、字段及过滤条件的配制文件imp_data.sql

2、建立一个目录etl_dir

3、运行oracle数据库程序p_etl_ora_data,生成各表的csv数据文件,同时也生成一个导入mysql的脚本文件imp_data.sql

4、导入mysql数据,文件内容如下

load data infile "alarm_hist_inc.csv" into table alarm_hist_inc fields terminated by "," enclosed by "^" lines terminated by "\r\n";
load data infile "button_authority.csv" into table button_authority fields terminated by "," enclosed by "^" lines terminated by "\r\n";
load data infile "c3_sms_hist_inc.csv" into table c3_sms_hist_inc fields terminated by "," enclosed by "^" lines terminated by "\r\n";
load data infile "datapermisson.csv" into table datapermisson fields terminated by "," enclosed by "^" lines terminated by "\r\n";

附:数据库脚本p_etl_ora_data

create or replace procedure p_etl_ora_data
(
  p_ora_dir  varchar2,
  p_data_path varchar2
) is
  type t_rec is record(
    tbn varchar2(40),
    whr varchar2(4000));
  type t_tabs is table of t_rec;
  v_tabs   t_tabs := t_tabs();
  v_etl_dir  varchar2(40) := p_ora_dir;
  v_load_file utl_file.file_type;
  procedure etl_data
  (
    p_sql_stmt varchar2,
    p_data_path varchar2,
    p_tb_name  varchar2
  ) is
  begin
    declare
      v_var_col  varchar2(32767);
      v_num_col  number;
      v_date_col date;
      v_tmz    timestamp;
      v_cols   number;
      v_cols_desc dbms_sql.desc_tab;
      v_row_str  varchar2(32767);
      v_col_str  varchar2(32767);
      v_sql_id  number;
      v_sql_ref  sys_refcursor;
      v_exp_file utl_file.file_type;
      v_data_path varchar2(200);
    begin
      v_data_path := p_data_path;
      if regexp_substr(v_data_path, '\\$') is null
      then
        v_data_path := v_data_path || '\';
      end if;
      v_data_path := replace(v_data_path, '\', '\\');
      open v_sql_ref for p_sql_stmt;
      v_sql_id := dbms_sql.to_cursor_number(v_sql_ref);
      dbms_sql.describe_columns(v_sql_id, v_cols, v_cols_desc);
      for i in v_cols_desc.first .. v_cols_desc.last
      loop
        case
          when v_cols_desc(i).col_type in (1, 9, 96) then
            dbms_sql.define_column(v_sql_id, i, v_var_col, 32767);
          when v_cols_desc(i).col_type = 2 then
            dbms_sql.define_column(v_sql_id, i, v_num_col);
          when v_cols_desc(i).col_type = 12 then
            dbms_sql.define_column(v_sql_id, i, v_date_col);
          when v_cols_desc(i).col_type = 180 then
            dbms_sql.define_column(v_sql_id, i, v_tmz);
        end case;
      end loop;
      declare
        v_flush_over pls_integer := 1;
        v_file_over pls_integer := 1;
        v_file_no  pls_integer := 1;
        v_file_name varchar2(200);
        v_line    varchar2(400);
      begin
        while dbms_sql.fetch_rows(v_sql_id) > 0
        loop
          if v_file_over = 1
          then
            v_file_name := p_tb_name || '_' || v_file_no || '.csv';
            v_exp_file := utl_file.fopen(v_etl_dir, v_file_name, open_mode => 'w', max_linesize => 32767);
          end if;
          v_row_str := '';
          for i in 1 .. v_cols
          loop
            v_col_str := '\n';
            begin
              case
                when v_cols_desc(i).col_type in (1, 9, 96) then
                  dbms_sql.column_value(v_sql_id, i, v_var_col);
                  if v_var_col is not null
                  then
                    v_col_str := '^' || v_var_col || '^';
                  end if;
                when v_cols_desc(i).col_type = 2 then
                  dbms_sql.column_value(v_sql_id, i, v_num_col);
                  if v_num_col is not null
                  then
                    v_col_str := v_num_col;
                  end if;
                when v_cols_desc(i).col_type = 12 then
                  dbms_sql.column_value(v_sql_id, i, v_date_col);
                  if v_date_col is not null
                  then
                    v_col_str := '^' || to_char(v_date_col, 'yyyy-mm-dd hh24:mi:ss') || '^';
                  end if;
                when v_cols_desc(i).col_type in (180, 181, 231) then
                  dbms_sql.column_value(v_sql_id, i, v_tmz);
                  if v_tmz is not null
                  then
                    v_col_str := '^' || to_char(v_tmz, 'yyyy-mm-dd hh24:mi:ss.ff6') || '^';
                  end if;
              end case;
              if i = 1
              then
                v_row_str := v_col_str;
              else
                v_row_str := v_row_str || ',' || v_col_str;
              end if;
            end;
          end loop;
          utl_file.put_line(v_exp_file, convert(v_row_str, 'utf8'));
          if v_file_over > 200000 /*每200000条记录就产生一个新的文件*/
          then
            v_file_over := 1;
            v_flush_over := 1;
            v_file_no  := v_file_no + 1;
            utl_file.fclose(v_exp_file);
            v_line := 'load data infile "' || v_data_path || v_file_name || '" into table ' || p_tb_name;
            v_line := v_line || ' fields terminated by "," enclosed by "^" lines terminated by "\r\n";';
            utl_file.put_line(v_load_file, v_line);
            utl_file.fflush(v_load_file);
            continue;
          end if;
          v_file_over := v_file_over + 1;
          if v_flush_over > 2000 /*每2000条记录就刷新缓存,写到文件中 */
          then
            utl_file.fflush(v_exp_file);
            v_flush_over := 1;
          else
            v_flush_over := v_flush_over + 1;
          end if;
        end loop;
        dbms_sql.close_cursor(v_sql_id);
        if utl_file.is_open(v_exp_file)
        then
          utl_file.fclose(v_exp_file);
          v_line := 'load data infile "' || v_data_path || v_file_name || '" into table ' || p_tb_name;
          v_line := v_line || ' fields terminated by "," enclosed by "^" lines terminated by "\r\n";';
          utl_file.put_line(v_load_file, v_line);
          utl_file.fflush(v_load_file);
        end if;
      end;
    exception
      when others then
        if dbms_sql.is_open(v_sql_id)
        then
          dbms_sql.close_cursor(v_sql_id);
        end if;
        if utl_file.is_open(v_exp_file)
        then
          utl_file.fclose(v_exp_file);
        end if;
        dbms_output.put_line(sqlerrm);
        dbms_output.put_line(p_sql_stmt);
    end;
  end;
begin
  begin
    execute immediate 'create table mysql_etl_tbs(tn varchar2(40),cn varchar2(40),ci number) ';
  exception
    when others then
      null;
  end;
  execute immediate 'truncate table mysql_etl_tbs';
  declare
    v_ci    pls_integer;
    v_cn    varchar2(40);
    v_etl_cols varchar2(32767);
    v_tbn   varchar2(30);
    v_etl_cfg varchar2(32767);
    v_cnf_file utl_file.file_type;
    v_from_pos pls_integer;
  begin
    v_cnf_file := utl_file.fopen(v_etl_dir, 'etl_tabs.cnf', 'r', 32767);
    loop
      utl_file.get_line(v_cnf_file, v_etl_cfg, 32767);
      v_from_pos := regexp_instr(v_etl_cfg, 'from', 1, 1, 0, 'i');
      v_etl_cols := substr(v_etl_cfg, 1, v_from_pos - 1);
      v_etl_cols := regexp_substr(v_etl_cols, '(select)(.+)', 1, 1, 'i', 2);
      v_tbn   := regexp_substr(v_etl_cfg, '(\s+from\s+)(\w+)(\s*)', 1, 1, 'i', 2);
      v_tbn   := upper(v_tbn);
      v_tabs.extend();
      v_tabs(v_tabs.last).tbn := v_tbn;
      v_tabs(v_tabs.last).whr := regexp_substr(v_etl_cfg, '\s+where .+', 1, 1, 'i');
      v_ci := 1;
      loop
        v_cn := regexp_substr(v_etl_cols, '\s+', 1, v_ci);
        exit when v_cn is null;
        v_cn := upper(v_cn);
        execute immediate 'insert into mysql_etl_tbs(tn,cn,ci) values(:1,:2,:3)'
          using v_tbn, v_cn, v_ci;
        commit;
        v_ci := v_ci + 1;
      end loop;
    end loop;
  exception
    when utl_file.invalid_path then
      dbms_output.put_line('指定的目录:etl_dir"' || '"无效!');
      return;
    when utl_file.invalid_filename then
      dbms_output.put_line('指定的文件:" etl_tabs.cnf' || '"无效!');
      return;
    when no_data_found then
      utl_file.fclose(v_cnf_file);
    when others then
      dbms_output.put_line(sqlerrm);
      return;
  end;
  declare
    v_cur_match  sys_refcursor;
    v_sql_smt   varchar2(32767);
    v_tn     varchar2(40);
    v_cn     varchar2(40);
    v_ci     pls_integer;
    v_column_name varchar2(40);
    v_etl_cols  varchar2(32767);
    v_line    varchar2(4000);
    v_tbn     varchar2(40);
  begin
    v_load_file := utl_file.fopen(v_etl_dir, 'load_data.sql', open_mode => 'w', max_linesize => 32767);
    for t_ix in v_tabs.first .. v_tabs.last
    loop
      v_sql_smt := 'select tn,cn,column_name,ci from ( select * from mysql_etl_tbs where tn='':tbn:'' ) l left join user_tab_columns r on l.tn = r.table_name and l.cn= r.column_name order by ci';
      v_tbn   := v_tabs(t_ix).tbn;
      v_sql_smt := replace(v_sql_smt, ':tbn:', v_tbn);
      v_etl_cols := null;
      open v_cur_match for v_sql_smt;
      loop
        fetch v_cur_match
          into v_tn, v_cn, v_column_name, v_ci;
        exit when v_cur_match%notfound;
        if v_ci > 1
        then
          v_etl_cols := v_etl_cols || ' , ';
        end if;
        if v_column_name is null
        then
          v_etl_cols := v_etl_cols || ' cast(null as number) ' || v_cn;
        else
          v_etl_cols := v_etl_cols || v_cn;
        end if;
      end loop;
      close v_cur_match;
      v_tbn   := lower(v_tbn);
      v_sql_smt := 'select ' || v_etl_cols || ' from ' || v_tbn || v_tabs(t_ix).whr;
      etl_data(v_sql_smt, p_data_path, v_tbn);
    end loop;
    if utl_file.is_open(v_load_file)
    then
      utl_file.fclose(v_load_file);
    end if;
  end;
end p_etl_ora_data;

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。如果你想了解更多相关内容请查看下面相关链接